# Squishing Transcripts
### Code that shows you how to combine Nexidia utterences into a single row per transcript

In this code, we'll assume that we have a list of UCID's, which is a common unique identifier for a phone conversation.

In [1]:
import pandas as pd
pd.set_option('display.max_columns', 500)

In [2]:
import pyspark
from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql import Row, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.window import Window

import pandas as pd

import os
from os.path import expanduser, join, abspath

from IPython.display import display, HTML

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession.builder.appName("SparkTest").config("spark.sql.warehouse.dir", warehouse_location).enableHiveSupport().getOrCreate()
sc = spark.sparkContext

To load a long list of UCID's, one way to do it is to save them in a file called ucid.py in the same directory, and import them as a list. This makes this notebook a lot cleaner as well.


In [3]:
# Example of ucid_list.py:

# ucid_list = [
#    '83847402203932308'
#   ,'2039420808029834'
#   ,'013908304230982']

Nexidia data is notoriously slow-running to query. The main reason is that the hdfs partitions are NOT the same as the date that the call took place. 


**day_id** = The day the file was imported (could contain calls from all days prior, but not in the future)

**interactiondatetime** = The day the call took place

The date limits can help you speed up the performance by adding filters where you are sure the UCID's fall into

In [4]:
from ucid import ucid_list
min_importdate = '2019-06-18'
max_importdate = '9999-12-31'

min_calldate = '2019-06-17'
max_calldate = '2019-07-01'

This code will dynamically create the SQL statement

In [5]:
sql_text = "SELECT udfvarchar2 as UCID, T.day_id, T.transcript, T.start_ms \
FROM (select interactiondatetime, interactionid from jrnl.nexidia_interaction \
where day_id >= '" + min_importdate + "' \
and day_id <= '" + max_importdate + "' \
and interactiondatetime >= '" + min_calldate + "' \
and interactiondatetime <= '" + max_calldate + "' \
) as i \
INNER JOIN (select udfvarchar2, interactionid from jrnl.nexidia_interaction_metadata \
where day_id >= '"+ min_importdate +"' and day_id <= '" + max_importdate + "') as m \
ON i.interactionid = m.interactionid \
INNER JOIN (select day_id, transcript, start_ms, interactionid \
from jrnl.nexidia_transcript where day_id >= '" + min_importdate + "' and day_id <= '" + max_importdate + "') as t \
ON i.interactionid = t.interactionid \
WHERE udfvarchar2 in ("


sql_text = sql_text + ','.join('\'{0}\''.format(w) for w in ucid_list) + ')'

In [6]:
df = spark.sql(sql_text)

In [7]:
df.limit(10).toPandas()

Unnamed: 0,UCID,day_id,transcript,start_ms
0,11000054081560830654,2019-06-20,UP INSTEAD ... OK,106080
1,11000054081560830654,2019-06-20,YEP IS ... SOMETHING WITH THE APPLICATION ITSE...,94560
2,11000054081560830654,2019-06-20,SHE'S ... OFF FOR THE EXTRA FOR APPLICATION IT...,43220
3,11000054081560830654,2019-06-20,NAME ... NO PROBLEM HAVE A NINE,122080
4,11000054081560830654,2019-06-20,I WILL GO AHEAD AND SEE WHAT I CAN HELP YOU WI...,108030
5,11000054081560830654,2019-06-20,IF IT'S ON THE MOBILE PHONE THAT THEY ... ARE ...,79090
6,11000054081560830654,2019-06-20,YES ... THAT IS NOT CORRECT,118550
7,11000054081560830654,2019-06-20,THIS IS ACTUALLY WITH REPAIRING BILLING ON THE...,18380
8,11000054081560830654,2019-06-20,NO THAT'S NOT COOL,5900
9,11000054081560830654,2019-06-20,HELLO MY NAME IS MARK FROM XFINITY MOBILE SURE...,13420


Now we'll need to aggreagate the text. 

This code is designed to dynamically aggregate multiple columns, by supplying a list of columns to the variable `agg_cols`

In this case, we're only doing it for one variable.

In [8]:
agg_cols=['transcript']

In [9]:
str_command = ""

# you must edit this to include the groupBy and Ordering logic
str_command = "newDF = df.orderBy([\"UCID\", \"day_id\", \"start_ms\"], ascending=[1, 1])"
str_command = str_command + ".groupBy(\"UCID\")"
str_command = str_command + ".agg("

for i, c in enumerate(agg_cols):
    if i:
        str_command = str_command + ', '
    str_command = str_command + "collect_list(\"" + c + "\").alias(\"" + c + "\")"

str_command = str_command + ")"

for i, c in enumerate(agg_cols):
    str_command = str_command + ".withColumn(\"" + c + "\",concat_ws(\" ... \", col(\"" + c  + "\")))"

#print(str_command)
exec(str_command)

In [10]:
newDF.limit(1).toPandas()

Unnamed: 0,UCID,transcript
0,10000182181560834686,SORRY THERE'S NO AVAILABLE SUPERVISOR ... HELL...


In [10]:
newDF.createOrReplaceTempView("mytempTable") 
sqlContext.sql("create table SCHEMA.TABLENAME as select * from mytempTable");