# Paths
* `/mnt/stage-zone/`: Stage zone
* `/mnt/user-linebot/`: User zone for LINE bot data
* `/mnt/code/`: Code bucket (unused)

**NOTE**: when accessing path via Python, append `/dbfs` to the beggining of the path. Example: `/dbfs/mnt/stage-zone/`

In [2]:
# Imports
import os
import glob
import logging
from pyspark.sql.types import *
from pyspark.sql import Row
from azure.cognitiveservices.language.textanalytics import TextAnalyticsClient
from msrest.authentication import CognitiveServicesCredentials
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail

# Set schema
linebot_schema = StructType([
  StructField('message_id', LongType(), True),
  StructField('message_type', StringType(), True),
  StructField('message_text', StringType(), True),
  StructField('timestamp', LongType(), True),
  StructField('type', StringType(), True),
  StructField('replyToken', StringType(), True),
  StructField('source_userid', StringType(), True),
  StructField('source_type', StringType(), True),
  StructField('mode', StringType(), True),
  StructField('published_timestamp', TimestampType(), True),
  StructField('loaded_timestamp', TimestampType(), True)
])

# Variables for Text Analytics API
subscription_key = os.getenv('TEXT_ANALYTICS_API_KEY')
endpoint = os.getenv('TEXT_ANALYTICS_ENDPOINT')

In [3]:
# Functions
def authenticateClient():
  """Authenticate the Text Analytics API client with subscription key and set endpoint"""
  credentials = CognitiveServicesCredentials(subscription_key)
  text_analytics_client = TextAnalyticsClient(endpoint=endpoint, credentials=credentials)
  return text_analytics_client

def language_detection(df):
  """Detect language of a message"""
  client = authenticateClient()
  # Prepare list of documents for Text Analytics API
  documents = []
  processed_documents = []
  for row in df.collect():
    documents.append({
      'id': row['message_id'],
      'text': row['message_text']
    })

  response = client.detect_language(documents=documents)

  for doc in response.documents:
    processed_documents.append({
      'id': doc.id,
      'language': doc.detected_languages[0].name,
      'language_short': doc.detected_languages[0].iso6391_name,
      'language_score': doc.detected_languages[0].score
    })

  # Create processed documents data frame and merge with initial one
  processed_docs_df = spark.createDataFrame(Row(**x) for x in processed_documents).withColumnRenamed('id', 'message_id')
  merged_df = df.join(processed_docs_df, on=['message_id'])

  # Clean memory
  del documents
  del processed_documents
  del processed_docs_df

  return merged_df

  
def sentiment(df):
  """Get text sentiment (Positive/Neutral/Negative) and it's score.
  Score is between 0 and 1. Text sentiment is classified as following:
  Positive: >= 0.75
  Neutral: >= 0.25 and < 0.75
  Negative: < 0.25
  """
  client = authenticateClient()
  # Prepare list of documents for Text Analytics API
  documents = []
  processed_documents = []
  for row in df.collect():
    documents.append({
      'id': row['message_id'],
      'language': row['language_short'],
      'text': row['message_text']
    })

  response = client.sentiment(documents=documents)

  for doc in response.documents:
    sentiment_category = 'Positive' if doc.score >= 0.75 else ('Neutral' if doc.score >= 0.25 else 'Negative')
    processed_documents.append({
      'id': doc.id,
      'sentiment': sentiment_category,
      'sentiment_score': float(doc.score)
    })

  # Create processed documents data frame and merge with initial one
  processed_docs_df = spark.createDataFrame(Row(**x) for x in processed_documents).withColumnRenamed('id', 'message_id')
  merged_df = df.join(processed_docs_df, on=['message_id'])

  # Clean memory
  del documents
  del processed_documents
  del processed_docs_df

  return merged_df
        

def transform(dbfs_path, local_path):
  """ETL flow: read csv data, transform (limit columns), export to user zone
        dbfs_path: full path to .csv file with /dbfs prefix
        local_path: mounted path starting with /mnt"""
  # Extract
  stage_df = spark.read.csv(local_path, header=True, schema=linebot_schema)
  # Transform
  limited_df = stage_df.select('message_id', 'source_userid', 'message_text', 'published_timestamp', 'loaded_timestamp')
  limited_df = language_detection(limited_df)
  limited_df = sentiment(limited_df)
  # Load
  dbfs_user_path = dbfs_path.replace("stage-zone", "user-linebot")
  # Make directories, otherwise raises exception when exporting
  os.makedirs(dbfs_user_path.replace(dbfs_user_path.split('/')[-1], ''), exist_ok=True)
  limited_df.toPandas().to_csv(dbfs_user_path, header=True, index=False, quoting=1)
  

def send_email(subject, message):
  """Send email message via SendGrid service to my private email"""
  msg_to_send = Mail(
    from_email='kirill@stereowind.com',
    to_emails='stereowind@gmail.com',
    subject=subject,
    html_content=message)
  try:
    sg = SendGridAPIClient(os.getenv('SENDGRID_API_KEY'))
    response = sg.send(msg_to_send)
    print("Email notification sent!")
  except Exception as e:
    print("Error occured while trying to send an email notification:")
    print(str(e))

In [4]:
# Main

# Get all csv file paths in stage zone, paths in processed_files table and subtract from them
#   all_df: list with all .csv files currently in stage zone
#   processed_files: previously processed files (already exist in user-linebot storage)
#   to_process: files that are going to be processed in this session 
all_files = glob.glob("/dbfs/mnt/stage-zone/" + "**/*.csv", recursive=True)
all_df = spark.createDataFrame(all_files, StringType()).withColumnRenamed('value', 'processed_files')
processed_files = spark.table('datalake.processed_files_linebot')
to_process = all_df.subtract(processed_files)

# Transform data
num_files = to_process.count()
if num_files > 0:
  for row in to_process.collect():
    dbfs_path = row['processed_files']
    local_path = dbfs_path.replace('/dbfs', '')
    try:
      transform(dbfs_path, local_path)
      spark.sql(f"INSERT INTO datalake.processed_files_linebot VALUES ('{dbfs_path}')")
    except Exception as e:
      print(f"File {localpath} failed to get processed!")

# Send email notification
send_email("Azure Linebot Datalake processing completed", f"Files processed: <strong>{num_files}</strong>")
print("Processing finished.")