Skip to content

Commit

Permalink
Slack integration (#28)
Browse files Browse the repository at this point in the history
* Adding slack integration and fixing private variable issue

* updating Readme about new information
  • Loading branch information
lalitpagaria committed Feb 3, 2021
1 parent 3d946fb commit 7a0f6ab
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 142 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,18 @@ Following environment variables are useful to customize various parameters -

- **Source/Observer**: Twitter, Play Store Reviews, Apple App Store Reviews, Sub Reddit (Facebook, Instagram, Google reviews, Amazon reviews, Slack, Microsoft Team, Chat-bots etc planned in future)
- **Analyzer/Segmenter**: Sentiment and Text classification (QA, Natural Search, FAQ, NER etc planned in future)
- **Sink/Informer**: HTTP API, ElasticSearch, DailyGet, and Jira (Salesforce, Zendesk, Hubspot, Slack, Microsoft Team, etc planned in future)
- **Sink/Informer**: HTTP API, ElasticSearch, DailyGet, Slack and Jira (Salesforce, Zendesk, Hubspot, Slack, Microsoft Team, etc planned in future)
- **Processor/WorkflowEngine**: Simple integration between Source, Analyser and Sink (Rich workflows using rule engine planned in future)
- **Convertor**: Very important part, which convert data from analyzer format to the format sink understand. It is very helpful in any customizations, refer `dailyget_sink.py` and `jira_sink.py`.

**Note:** In order to use some integrations you would need credentials, refer following list -
- [Twitter](https://twitter.com/): To make authorized API call, get access from [dev portal](https://developer.twitter.com/en/apply-for-access). Read about [search api](https://developer.twitter.com/en/docs/twitter-api/tweets/search/introduction) for more details.
- [Play Store](https://play.google.com/): To make authorized API calls, get [service account's credentials](https://developers.google.com/identity/protocols/oauth2/service-account). Read about [review api](https://googleapis.github.io/google-api-python-client/docs/dyn/androidpublisher_v3.reviews.html) for more details.
- [Reddit](https://praw.readthedocs.io/en/latest/getting_started/authentication.html): To make authorized API calls, create [client app](https://www.reddit.com/prefs/apps).
- [Reddit](https://www.reddit.com/): To make authorized API calls, create [client app](https://www.reddit.com/prefs/apps). For more detail refer [link](https://praw.readthedocs.io/en/latest/getting_started/authentication.html).
- [Slack](https://slack.com/): To send message to Slack channel, get bot or user token. Refer [link](https://api.slack.com/authentication/token-types#bot).

## Model selection
Any model listed in [Text Classification](https://huggingface.co/models?filter=text-classification) or [Zero-Shot Classification](https://huggingface.co/models?filter=zero-shot-classification) can be used in Segmenter.
Any model listed in [Named Entity Recognition](https://huggingface.co/models?filter=token-classification), [Text Classification](https://huggingface.co/models?filter=text-classification) and [Zero-Shot Classification](https://huggingface.co/models?filter=zero-shot-classification) can be used in Segmenter.

## Examples and Screenshots
Refer [example](https://github.com/lalitpagaria/obsei/tree/master/example) and [config](https://github.com/lalitpagaria/obsei/tree/master/config) folders for `obsei` usage and configurations.
Expand Down
4 changes: 2 additions & 2 deletions example/reddit_scrapper_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def print_state(id: str):
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

since_time = datetime.utcnow().astimezone(pytz.utc) + timedelta(days=1)
since_time = datetime.utcnow().astimezone(pytz.utc) + timedelta(days=-1)

source_config = RedditScrapperConfig(
urls=["https://www.reddit.com/r/wallstreetbets/comments/.rss?sort=new"],
url="https://www.reddit.com/r/wallstreetbets/comments/.rss?sort=new",
user_agent="testscript by u/FitStatistician7378",
lookup_period=since_time.strftime(DATETIME_STRING_PATTERN)
)
Expand Down
62 changes: 62 additions & 0 deletions example/slack_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging
import os
import sys
from datetime import datetime, timedelta

import pytz

from obsei.analyzer.dummy_analyzer import DummyAnalyzer, DummyAnalyzerConfig
from obsei.misc.utils import DATETIME_STRING_PATTERN
from obsei.processor import Processor
from obsei.sink.slack_sink import SlackSink, SlackSinkConfig
from obsei.source.reddit_source import RedditConfig, RedditSource
from obsei.workflow.store import WorkflowStore
from obsei.workflow.workflow import Workflow, WorkflowConfig


def print_state(id: str):
logger.info(f'Source State: {source.store.get_source_state(id)}')


logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

since_time = datetime.utcnow().astimezone(pytz.utc) + timedelta(hours=-1)

workflow_store = WorkflowStore()

source_config = RedditConfig(
subreddits=["wallstreetbets"],
lookup_period=since_time.strftime(DATETIME_STRING_PATTERN)
)

source = RedditSource(store=workflow_store)

sink_config = SlackSinkConfig(
slack_token=os.environ['SLACK_TOKEN'],
channel_id="C01LRS6CT9Q"
)
sink = SlackSink(store=workflow_store)

analyzer_config = DummyAnalyzerConfig()
analyzer = DummyAnalyzer()

workflow = Workflow(
config=WorkflowConfig(
source_config=source_config,
sink_config=sink_config,
analyzer_config=analyzer_config
),
)
workflow_store.add_workflow(workflow)

processor = Processor(
analyzer=analyzer,
sink=sink,
source=source,
analyzer_config=analyzer_config
)

processor.process(workflow=workflow)

print_state(workflow.id)
Binary file modified images/Obsei-flow-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 4 additions & 6 deletions obsei/analyzer/classification_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from typing import Any, Dict, List, Optional

from transformers import pipeline
from pydantic import PrivateAttr
from transformers import Pipeline, pipeline

from obsei.analyzer.base_analyzer import AnalyzerRequest, AnalyzerResponse, BaseAnalyzer, BaseAnalyzerConfig

Expand All @@ -15,16 +16,13 @@ class ClassificationAnalyzerConfig(BaseAnalyzerConfig):


class ZeroShotClassificationAnalyzer(BaseAnalyzer):
__slots__ = ('_pipeline',)
_pipeline: Pipeline = PrivateAttr()
TYPE: str = "Classification"
model_name_or_path: str

def __init__(self, **data: Any):
super().__init__(**data)
object.__setattr__(
self,
'_pipeline', pipeline("zero-shot-classification", model=self.model_name_or_path)
)
self._pipeline = pipeline("zero-shot-classification", model=self.model_name_or_path)

def _classify_text_from_model(
self, text: str,
Expand Down
19 changes: 9 additions & 10 deletions obsei/analyzer/ner_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import logging
from typing import Any, Dict, List, Optional

from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline
from pydantic import PrivateAttr
from transformers import AutoModelForTokenClassification, AutoTokenizer, Pipeline, pipeline

from obsei.analyzer.base_analyzer import AnalyzerRequest, AnalyzerResponse, BaseAnalyzer, BaseAnalyzerConfig

logger = logging.getLogger(__name__)


class NERAnalyzer(BaseAnalyzer):
__slots__ = ('_pipeline',)
_pipeline: Pipeline = PrivateAttr()
TYPE: str = "NER"
model_name_or_path: str
tokenizer_name: Optional[str] = None
Expand All @@ -23,14 +24,12 @@ def __init__(self, **data: Any):
tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_name, use_fast=True)
else:
tokenizer = None
object.__setattr__(
self,
'_pipeline', pipeline(
'ner',
model=model,
tokenizer=tokenizer,
grouped_entities=self.grouped_entities
)

self._pipeline = pipeline(
'ner',
model=model,
tokenizer=tokenizer,
grouped_entities=self.grouped_entities
)

def _classify_text_from_model(self, text: str) -> Dict[str, float]:
Expand Down
8 changes: 3 additions & 5 deletions obsei/analyzer/sentiment_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import Any, List

from pydantic import PrivateAttr
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

from obsei.analyzer.base_analyzer import AnalyzerRequest, AnalyzerResponse, BaseAnalyzer, BaseAnalyzerConfig
Expand All @@ -10,15 +11,12 @@


class VaderSentimentAnalyzer(BaseAnalyzer):
__slots__ = ('_model',)
_model: SentimentIntensityAnalyzer = PrivateAttr()
TYPE: str = "Sentiment"

def __init__(self, **data: Any):
super().__init__(**data)
object.__setattr__(
self,
'_model', SentimentIntensityAnalyzer()
)
self._model = SentimentIntensityAnalyzer()

def _get_sentiment_score_from_vader(self, text: str) -> float:
scores = self._model.polarity_scores(text)
Expand Down
10 changes: 3 additions & 7 deletions obsei/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from hydra.experimental import compose, initialize
from hydra.utils import instantiate
from omegaconf import DictConfig, OmegaConf
from pydantic import BaseSettings, Field, constr
from pydantic import BaseSettings, Field, PrivateAttr, constr

from obsei.analyzer.base_analyzer import BaseAnalyzer, BaseAnalyzerConfig
from obsei.sink.dailyget_sink import DailyGetSinkConfig
Expand All @@ -18,18 +18,14 @@


class ObseiConfiguration(BaseSettings):
__slots__ = ('configuration',)
configuration: DictConfig = PrivateAttr()
config_path: constr(min_length=1) = Field(None, env='obsei_config_path')
config_filename: constr(min_length=1) = Field(None, env='obsei_config_filename')

def __init__(self, **data: Any):
super().__init__(**data)
with initialize(config_path=self.config_path):
object.__setattr__(
self,
'configuration',
compose(self.config_filename)
)
self.configuration = compose(self.config_filename)
logger.debug("Configuration: \n" + OmegaConf.to_yaml(self.configuration))

def initialize_instance(self, key_name: str = None):
Expand Down
4 changes: 2 additions & 2 deletions obsei/sink/base_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class Config:


class BaseSink(BaseModel):
convertor: Convertor
store: BaseStore
convertor: Optional[Convertor] = None
store: Optional[BaseStore] = None

def __init__(self, **data: Any):
super().__init__(**data)
Expand Down
22 changes: 9 additions & 13 deletions obsei/sink/elasticsearch_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.exceptions import RequestError
from pydantic import Field, SecretStr
from pydantic import Field, PrivateAttr, SecretStr

from obsei.sink.base_sink import BaseSink, BaseSinkConfig, Convertor
from obsei.analyzer.base_analyzer import AnalyzerResponse


class ElasticSearchSinkConfig(BaseSinkConfig):
# This is done to avoid exposing member to API response
__slots__ = ('_es_client',)
_es_client: Elasticsearch = PrivateAttr()
TYPE: str = "Elasticsearch"
host: str
port: int
Expand All @@ -30,17 +30,13 @@ class ElasticSearchSinkConfig(BaseSinkConfig):

def __init__(self, **data: Any):
super().__init__(**data)
object.__setattr__(
self,
'_es_client',
Elasticsearch(
hosts=[{"host": self.host, "port": self.port}],
http_auth=(self.username.get_secret_value(), self.password.get_secret_value()),
scheme=self.scheme,
ca_certs=self.ca_certs,
verify_certs=self.verify_certs,
timeout=self.timeout
)
self._es_client = Elasticsearch(
hosts=[{"host": self.host, "port": self.port}],
http_auth=(self.username.get_secret_value(), self.password.get_secret_value()),
scheme=self.scheme,
ca_certs=self.ca_certs,
verify_certs=self.verify_certs,
timeout=self.timeout
)
self.base_payload = self.base_payload or {
"_op_type": "create", # TODO update exiting support?
Expand Down
18 changes: 7 additions & 11 deletions obsei/sink/jira_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, Dict, List, Optional

from atlassian import Jira
from pydantic import SecretStr
from pydantic import PrivateAttr, SecretStr

from obsei.sink.base_sink import BaseSink, BaseSinkConfig, Convertor
from obsei.analyzer.base_analyzer import AnalyzerResponse
Expand Down Expand Up @@ -43,7 +43,7 @@ def convert(

class JiraSinkConfig(BaseSinkConfig):
# This is done to avoid exposing member to API response
__slots__ = ('_jira_client',)
_jira_client: Jira = PrivateAttr()
TYPE: str = "Jira"
url: str
username: SecretStr
Expand All @@ -57,15 +57,11 @@ class JiraSinkConfig(BaseSinkConfig):

def __init__(self, **data: Any):
super().__init__(**data)
object.__setattr__(
self,
'_jira_client',
Jira(
url=self.url,
username=self.username.get_secret_value(),
password=self.password.get_secret_value(),
verify_ssl=self.verify_ssl,
)
self._jira_client = Jira(
url=self.url,
username=self.username.get_secret_value(),
password=self.password.get_secret_value(),
verify_ssl=self.verify_ssl,
)

def get_jira_client(self):
Expand Down
54 changes: 54 additions & 0 deletions obsei/sink/slack_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
from typing import Any, List

from pydantic import Field, PrivateAttr, SecretStr
from slack_sdk import WebClient

from obsei.sink.base_sink import BaseSink, BaseSinkConfig, Convertor
from obsei.analyzer.base_analyzer import AnalyzerResponse

logger = logging.getLogger(__name__)


class SlackSinkConfig(BaseSinkConfig):
# This is done to avoid exposing member to API response
_slack_client: WebClient = PrivateAttr()
TYPE: str = "Slack"

slack_token: SecretStr = Field(None, env='SLACK_TOKEN')
channel_id: str

def __init__(self, **data: Any):
super().__init__(**data)
self._slack_client=WebClient(token=self.slack_token.get_secret_value())

def get_slack_client(self):
return self._slack_client


class SlackSink(BaseSink):
def __init__(self, **data: Any):
super().__init__(**data)

def send_data(
self,
analyzer_responses: List[AnalyzerResponse],
config: SlackSinkConfig,
**kwargs
):
responses = []
payloads = []
for analyzer_response in analyzer_responses:
payloads.append(self.convertor.convert(
analyzer_response=analyzer_response
))

for payload in payloads:
response = config.get_slack_client().chat_postMessage(
channel=config.channel_id,
text=f'```\n{payload["processed_text"]}\n```'
)
logger.info(f"response='{response}'")
responses.append(response)

return responses
Loading

0 comments on commit 7a0f6ab

Please sign in to comment.