###

text preprocessing: token, remove stop words (I have Spacy worked out for lemma, attached code below)

3 outputs: 
            1. number of classified (for each month)
            2. average of each category's percentage across users (real time)
            3. average of each category's percentage across users (user life time)

In [1]:
# different with the code for cluster
import findspark
findspark.init()
#
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors
import pyspark.sql.functions as func

from pyspark.sql.window import Window
from pyspark.sql import DataFrame

import re

from optparse import OptionParser

import string
from typing import Iterable
from pyspark.sql.functions import udf, col
from pyspark.sql import functions as F

from pyspark.ml import Pipeline

from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover


In [2]:
spark = SparkSession.builder \
    .appName("Spark NLP2")\
    .master("local[*]")\
    .config("spark.driver.memory","4G")\
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.kryoserializer.buffer.max", "500m")\
    .getOrCreate()


In [3]:
import spacy
from spacy.lang.en.stop_words import STOP_WORDS

In [4]:
# Functions
def classify_Food(discription):
    b=any([(x in new_dict['Food']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Food_udf=udf(classify_Food,IntegerType())

def classify_Event(discription):
    b=any([(x in new_dict['Event']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Event_udf=udf(classify_Event,IntegerType())

def classify_People(discription):
    b=any([(x in new_dict['People']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_People_udf=udf(classify_People,IntegerType())

def classify_Activity(discription):
    b=any([(x in new_dict['Activity']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Activity_udf=udf(classify_Activity,IntegerType())

def classify_Travel(discription):
    b=any([(x in new_dict['Travel']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Travel_udf=udf(classify_Travel,IntegerType())

def classify_Transportation(discription):
    b=any([(x in new_dict['Transportation']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Transportation_udf=udf(classify_Transportation,IntegerType())

def classify_Utility(discription):
    b=any([(x in new_dict['Utility']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Utility_udf=udf(classify_Utility,IntegerType())

def classify_Cash(discription):
    b=any([(x in new_dict['Cash']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Cash_udf=udf(classify_Cash,IntegerType())

def classify_Illegal(discription):
    b=any([(x in new_dict['Illegal/Sarcasm']) for x in set(discription)])
    if b:
        return 1
    else:
        return 0
    
classify_Illegal_udf=udf(classify_Illegal,IntegerType())

# convert to dummy variable
def convert_dummy(x):
    if x>0:
        return 1
    else:
        return 0
convert_dummy_udf=udf(convert_dummy,IntegerType())

countTokens = udf(lambda words: len(words), IntegerType())


In [5]:
# input: venmo data

inputFile = spark\
    .read\
    .option("inferSchema","true")\
    .option("header", "true")\
    .csv("/Users/yichuan/Desktop/Venmo project/data/venmoSample.csv")
inputFile = inputFile.na.fill('')

In [6]:
# user life time 
my_window = Window.partitionBy('user1')
inputFile = inputFile.withColumn("min_date", min(inputFile['datetime']).over(my_window))
inputFile = inputFile.withColumn("diff_date", datediff('datetime','min_date'))
inputFile = inputFile.withColumn("customer_lifetime", F.when(inputFile['diff_date']==0,0).otherwise(inputFile['diff_date']/30+1).cast(IntegerType()))


In [7]:
inputFile.show(5)

+-----+-----+----------------+-------------------+--------------------+-----------+--------------------+-------------------+---------+-----------------+
|user1|user2|transaction_type|           datetime|         description|is_business|            story_id|           min_date|diff_date|customer_lifetime|
+-----+-----+----------------+-------------------+--------------------+-----------+--------------------+-------------------+---------+-----------------+
| 2866|30588|         payment|2015-09-15 14:27:00|               Stuff|      false|55f82ab4cd03c9af2...|2015-09-15 14:27:00|        0|                0|
| 6620| 6507|         payment|2012-04-16 14:32:43|for taking me out...|      false|54e4165dcd03c9af2...|2012-04-16 14:32:43|        0|                0|
| 6620| 6606|         payment|2013-08-05 17:21:57|       Ralph LAU-ren|      false|51ffdb8f7de518fa3...|2012-04-16 14:32:43|      476|               16|
|28170|27438|         payment|2012-11-03 02:35:06|           November!|      false

In [8]:
# remove punctuation

punctuations = '~|`|\!|@|#|$|%|^|&|\*|\(|\)|-|\+|=|_|\{|\}|\[|\]|;|:|\?|\.|,|<|>|/|\'|\"'

inputFile=inputFile\
.withColumn('description_rm_pun',regexp_replace(col('description'),punctuations, ' '))

# keep emoji
inputFile=inputFile\
.withColumn('description_emoji',regexp_replace(col('description_rm_pun'),'[\w\s]', ''))
# keep text
inputFile=inputFile\
.withColumn('description_word',regexp_replace(col('description_rm_pun'),'[^\w\s]', ''))



In [9]:
inputFile = inputFile.withColumn("total_tokens", countTokens(col("description")))
inputFile = inputFile.withColumn("year", year("datetime"))
inputFile = inputFile.withColumn("month", month("datetime"))

In [10]:
rt = RegexTokenizer().setInputCol("description_word").setOutputCol("tokenized_words").setPattern(" ").setToLowercase(True)
englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
stops = StopWordsRemover().setStopWords(englishStopWords).setInputCol("tokenized_words").setOutputCol("tokenized_words_filtered")
pipeline = Pipeline(stages=[rt, stops])

# Fit the pipeline to dataframe
pipelineFit = pipeline.fit(inputFile)
inputFile = pipelineFit.transform(inputFile)

In [11]:
## count tokens in each transaction
inputFile = inputFile.withColumn("total_word_tokens", countTokens(col("description_word"))) 
inputFile = inputFile.withColumn("total_emoji_tokens", countTokens(col("description_emoji"))) 
inputFile = inputFile.withColumn("if_emoji_only", inputFile['total_emoji_tokens'] == inputFile['total_tokens'])
inputFile = inputFile.withColumn("is_emoji", inputFile['total_emoji_tokens'] > 0)


In [12]:
## Emoji Analysis
agg_data = inputFile.groupBy("year", "month").agg(
                    count('user1').alias("total_transactions_per_month"),
                    sum('total_tokens').alias("total_tokens_per_month"),
                    sum('total_emoji_tokens').alias("total_emoji_tokens_per_month"),
                    (F.sum(F.col("if_emoji_only").cast("long")).alias("total_emoji_only_per_month"))).orderBy("year", "month")


agg_data = agg_data.withColumn('percent_of_emoji', (F.col("total_emoji_tokens_per_month") / F.col("total_tokens_per_month")))           
agg_data = agg_data.withColumn('percent_of_emoji_only', (F.col("total_emoji_only_per_month") / F.col("total_transactions_per_month")))                     
agg_data = agg_data.sort('year', 'month', ascending=True)

user_averageEmoji = inputFile.groupBy("year", "month", "user1").agg(
                    count('user1').alias("total_transactions_per_month"),
                    (F.sum(F.col("is_emoji").cast("long")).alias("total_emoji_transactions_per_month")))    

user_averageEmoji = user_averageEmoji.withColumn('emoji_avg',user_averageEmoji['total_emoji_transactions_per_month']/user_averageEmoji['total_transactions_per_month'])

### We need this as output ###
plot_user_emoji = user_averageEmoji.groupBy("year", "month").agg(
    avg("emoji_avg").alias("avg_emoji_usage"),
stddev_pop("emoji_avg").alias("sd_emoji_usage"))
plot_user_emoji = plot_user_emoji.sort('year', 'month', ascending=True)


first_emoji_date = inputFile.filter(col("total_emoji_tokens") > 0).groupBy("user1").agg(min('datetime'))
first_emoji_date = first_emoji_date.withColumn("year", year("min(datetime)"))
first_emoji_date = first_emoji_date.withColumn("month", month("min(datetime)"))

plot_first_emoji = first_emoji_date.groupBy("year", "month").count().sort('year', 'month', ascending=True)


In [13]:
inputFile.show(5)

+-----+-----+----------------+-------------------+--------------------+-----------+--------------------+-------------------+---------+-----------------+--------------------+-----------------+--------------------+--------------------+------------+----+-----+--------------------+------------------------+-----------------+------------------+-------------+--------+
|user1|user2|transaction_type|           datetime|         description|is_business|            story_id|           min_date|diff_date|customer_lifetime|  description_rm_pun|description_emoji|    description_word|         lower_words|total_tokens|year|month|     tokenized_words|tokenized_words_filtered|total_word_tokens|total_emoji_tokens|if_emoji_only|is_emoji|
+-----+-----+----------------+-------------------+--------------------+-----------+--------------------+-------------------+---------+-----------------+--------------------+-----------------+--------------------+--------------------+------------+----+-----+---------------

-----

In [14]:
## input : dictionary
emoji_dict = spark\
    .read\
    .option("inferSchema","true")\
    .option("header", "true")\
    .csv("/Users/yichuan/Desktop/Venmo project/data/Venmo_Emoji_Classification_Dictionary.csv")
word_dict = spark\
    .read\
    .option("inferSchema","true")\
    .option("header", "true")\
    .csv("/Users/yichuan/Desktop/Venmo project/data/Venmo_Word_Classification_Dictionary.csv")

In [15]:
new_dict = word_dict.toPandas().to_dict(orient='list')
filtered = {k: [x for x in v if x is not None] for k, v in new_dict.items()}
new_dict.clear()
new_dict.update(filtered)

emoji_dict = emoji_dict.toPandas().to_dict(orient='list')
filtered = {k: [x for x in v if x is not None] for k, v in emoji_dict.items()}
emoji_dict.clear()
emoji_dict.update(filtered)


In [17]:
for col in emoji_dict.keys():
    new_dict[col]=new_dict[col]+ emoji_dict[col]
    
inputFile = inputFile.withColumn("Food_emoji", classify_Food_udf("description_emoji"))
inputFile = inputFile.withColumn("Event_emoji", classify_Event_udf("description_emoji"))
inputFile = inputFile.withColumn("People_emoji", classify_People_udf("description_emoji"))
inputFile = inputFile.withColumn("Activity_emoji", classify_Activity_udf("description_emoji"))
inputFile = inputFile.withColumn("Travel_emoji", classify_Travel_udf("description_emoji"))
inputFile = inputFile.withColumn("Transportation_emoji", classify_Transportation_udf("description_emoji"))
inputFile = inputFile.withColumn("Utility_emoji", classify_Utility_udf("description_emoji"))
inputFile = inputFile.withColumn("Food_word", classify_Food_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("Event_word", classify_Event_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("People_word", classify_People_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("Activity_word", classify_Activity_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("Travel_word", classify_Travel_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("Transportation_word", classify_Transportation_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("Utility_word", classify_Utility_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("Cash_word", classify_Cash_udf("tokenized_words_filtered"))
inputFile = inputFile.withColumn("Illegal_word", classify_Illegal_udf("tokenized_words_filtered"))

inputFile = inputFile.withColumn("Event", F.col("Event_emoji")+F.col('Event_word'))
inputFile = inputFile.withColumn("Travel", F.col("Travel_emoji")+F.col('Travel_word'))
inputFile = inputFile.withColumn("Food", F.col("Food_emoji")+F.col('Food_word'))
inputFile = inputFile.withColumn("Activity", F.col("Activity_emoji")+F.col('Activity_word'))
inputFile = inputFile.withColumn("Transportation", F.col("Transportation_emoji")+F.col('Transportation_word'))
inputFile = inputFile.withColumn("People", F.col("People_emoji")+F.col('People_word'))
inputFile = inputFile.withColumn("Utility", F.col("Utility_emoji")+F.col('Utility_word'))
inputFile = inputFile.withColumn("Event",  convert_dummy_udf("Event"))
inputFile = inputFile.withColumn("Travel",  convert_dummy_udf("Travel"))
inputFile = inputFile.withColumn("Food",  convert_dummy_udf("Food"))
inputFile = inputFile.withColumn("Activity",  convert_dummy_udf("Activity"))
inputFile = inputFile.withColumn("Transportation",  convert_dummy_udf("Transportation"))
inputFile = inputFile.withColumn("People",  convert_dummy_udf("People"))
inputFile = inputFile.withColumn("Utility",  convert_dummy_udf("Utility"))
inputFile = inputFile.withColumn("Total_Sum_Category_Dummies", F.col("Event")+ F.col('Travel') + F.col('Food') +F.col('Activity') + F.col('Transportation') + F.col('People') + F.col('Utility') + F.col('Illegal_word') + F.col('Cash_word'))

    

### output 1

In [22]:
## output file: number of classified (group by month)

agg_number_of_classified = inputFile.groupBy("year", "month").agg(
                    count('user1').alias("Total_Transactions_per_month"),
                    sum('Total_Sum_Category_Dummies').alias("Total_Classified_per_month")).orderBy("year", "month")

In [23]:
agg_number_of_classified.show(10)

+----+-----+----------------------------+--------------------------+
|year|month|Total_Transactions_per_month|Total_Classified_per_month|
+----+-----+----------------------------+--------------------------+
|2011|   11|                           1|                         1|
|2011|   12|                           4|                         2|
|2012|    3|                           1|                         1|
|2012|    4|                         202|                       136|
|2012|    5|                         288|                       200|
|2012|    6|                         295|                       220|
|2012|    7|                         326|                       213|
|2012|    8|                         530|                       352|
|2012|    9|                         679|                       494|
|2012|   10|                         829|                       548|
+----+-----+----------------------------+--------------------------+
only showing top 10 rows




### avg across user 

In [18]:
### 1. for venmo time

agg_categories_1 = inputFile.groupBy('user1',"year", "month").agg(
                    count('user1').alias("user_total_transactions"),
                    sum('Total_Sum_Category_Dummies').alias("user_total_dummies"),
                    
                    sum('Event').alias("Events_per_month"),
                    sum('Travel').alias("Travel_per_month"),
                    sum('Food').alias("Food_per_month"),
                    sum('Activity').alias("Activity_per_month"),
                    sum('Transportation').alias("Transportation_per_month"),
                    sum('People').alias("People_per_month"),
                    sum('Utility').alias("Utility_per_month"),
                    sum('Illegal_word').alias("Illegal_word_per_month"),
                    sum('Cash_word').alias("Cash_word_per_month")).orderBy("year", "month")



In [19]:
agg_categories_1.show(5)

+-----+----+-----+-----------------------+------------------+----------------+----------------+--------------+------------------+------------------------+----------------+-----------------+----------------------+-------------------+
|user1|year|month|user_total_transactions|user_total_dummies|Events_per_month|Travel_per_month|Food_per_month|Activity_per_month|Transportation_per_month|People_per_month|Utility_per_month|Illegal_word_per_month|Cash_word_per_month|
+-----+----+-----+-----------------------+------------------+----------------+----------------+--------------+------------------+------------------------+----------------+-----------------+----------------------+-------------------+
|45688|2011|   11|                      1|                 1|               1|               0|             0|                 0|                       0|               0|                0|                     0|                  0|
|73992|2011|   12|                      1|                 0|       

In [38]:
# percentage of 'food'... for each user (monthly)
agg_categories_1=agg_categories_1 \
.withColumn('Event',F.col('Events_per_month')/F.col('user_total_dummies')) \
.withColumn('Travel',F.col('Travel_per_month')/F.col('user_total_dummies')) \
.withColumn('Food',F.col('Food_per_month')/F.col('user_total_dummies')) \
.withColumn('Activity',F.col('Activity_per_month')/F.col('user_total_dummies')) \
.withColumn('Transportation',F.col('Transportation_per_month')/F.col('user_total_dummies')) \
.withColumn('People',F.col('People_per_month')/F.col('user_total_dummies')) \
.withColumn('Utility',F.col('Utility_per_month')/F.col('user_total_dummies')) \
.withColumn('Illegal',F.col('Illegal_word_per_month')/F.col('user_total_dummies')) \
.withColumn('Cash',F.col('Cash_word_per_month')/F.col('user_total_dummies')) \

In [30]:
### 2. for customer life time

agg_categories_customer_1 = inputFile.groupBy('user1',"customer_lifetime").agg(
                    count('user1').alias("user_total_transactions"),
                    sum('Total_Sum_Category_Dummies').alias("user_total_dummies"),
                    
                    sum('Event').alias("Events_per_month"),
                    sum('Travel').alias("Travel_per_month"),
                    sum('Food').alias("Food_per_month"),
                    sum('Activity').alias("Activity_per_month"),
                    sum('Transportation').alias("Transportation_per_month"),
                    sum('People').alias("People_per_month"),
                    sum('Utility').alias("Utility_per_month"),
                    sum('Illegal_word').alias("Illegal_word_per_month"),
                    sum('Cash_word').alias("Cash_word_per_month")).orderBy("customer_lifetime")



In [32]:
# percentage of 'food'... for each user (monthly)
agg_categories_customer_1 = agg_categories_customer_1 \
.withColumn('Event',F.col('Events_per_month')/F.col('user_total_dummies')) \
.withColumn('Travel',F.col('Travel_per_month')/F.col('user_total_dummies')) \
.withColumn('Food',F.col('Food_per_month')/F.col('user_total_dummies')) \
.withColumn('Activity',F.col('Activity_per_month')/F.col('user_total_dummies')) \
.withColumn('Transportation',F.col('Transportation_per_month')/F.col('user_total_dummies')) \
.withColumn('People',F.col('People_per_month')/F.col('user_total_dummies')) \
.withColumn('Utility',F.col('Utility_per_month')/F.col('user_total_dummies')) \
.withColumn('Illegal',F.col('Illegal_word_per_month')/F.col('user_total_dummies')) \
.withColumn('Cash',F.col('Cash_word_per_month')/F.col('user_total_dummies')) \

### output 2, 3

In [41]:

agg_categories = agg_categories_1.groupBy("year", "month").agg(
                    sum('user_total_transactions').alias("total_transactions_per_month"),  ## sum of 'user_total_transaction'
                    sum('user_total_dummies').alias("total_dummies_per_month"),  ## sum of 'user_total_dummies'
                    
                    avg('Event').alias("Events_per_month"),
                    avg('Travel').alias("Travel_per_month"),
                    avg('Food').alias("Food_per_month"),
                    avg('Activity').alias("Activity_per_month"),
                    avg('Transportation').alias("Transportation_per_month"),
                    avg('People').alias("People_per_month"),
                    avg('Utility').alias("Utility_per_month"),
                    avg('Illegal').alias("Illegal_word_per_month"),
                    avg('Cash').alias("Cash_word_per_month")).orderBy("year", "month")


agg_categories_customer_lifetime = agg_categories_customer_1.groupBy("customer_lifetime").agg(
                    sum('user_total_transactions').alias("total_transactions_per_customer_stage"),
                    sum('user_total_dummies').alias("total_dummies_per_customer_stage"),
                    
                    avg('Event').alias("Events_per_customer_stage"),
                    avg('Travel').alias("Travel_per_customer_stage"),
                    avg('Food').alias("Food_per_customer_stage"),
                    avg('Activity').alias("Activity_per_customer_stage"),
                    avg('Transportation').alias("Transportation_per_customer_stage"),
                    avg('People').alias("People_per_customer_stage"),
                    avg('Utility').alias("Utility_per_customer_stage"),
                    avg('Illegal').alias("Illegal_word_per_customer_stage"),
                    avg('Cash').alias("Cash_word_per_customer_stage")).orderBy("customer_lifetime")

In [42]:
agg_categories.show(5)

+----+-----+----------------------------+-----------------------+----------------+------------------+------------------+-------------------+------------------------+-------------------+-------------------+----------------------+-------------------+
|year|month|total_transactions_per_month|total_dummies_per_month|Events_per_month|  Travel_per_month|    Food_per_month| Activity_per_month|Transportation_per_month|   People_per_month|  Utility_per_month|Illegal_word_per_month|Cash_word_per_month|
+----+-----+----------------------------+-----------------------+----------------+------------------+------------------+-------------------+------------------------+-------------------+-------------------+----------------------+-------------------+
|2011|   11|                           1|                      1|             1.0|               0.0|               0.0|                0.0|                     0.0|                0.0|                0.0|                   0.0|                0.0|
|201