import pyspark
import findspark
from pyspark.sql import SQLContext, SparkSession

sc = SparkSession \
        .builder \
        .master('spark://10.0.0.118:7077') \
        .appName("sparkFromJupyter") \
        .config("spark.rpc.message.maxSize", 2040) \
        .getOrCreate()
sqlContext = SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)
print("Spark Version: " + sc.version)
print("PySpark Version: " + pyspark.__version__)

In [None]:
# https://www.hackdeploy.com/how-to-run-pyspark-in-a-jupyter-notebook/
# ^ Set-up details for Pyspark + Jupyter

# https://phoenixnap.com/kb/install-spark-on-windows-10
# ^ Details on installing Pyspark

# https://stackoverflow.com/questions/69669524/spark-illegal-character-in-path
# ^ Details on fixing acccess to Spark Cluster in cmd

# spark-shell --master spark://10.0.0.118:7077
# ^ How to kick off spark-shell in cmd

# https://www.sicara.ai/blog/2017-05-02-get-started-pyspark-jupyter-notebook-3-minutes
# ^ Details on Findspark

# https://www.geeksforgeeks.org/python-xml-to-json/
# ^ XML to JSON

# https://stackoverflow.com/questions/49675860/pyspark-converting-json-string-to-dataframe
# ^ Resource for converting JSON into dataframe using parallelize

# https://sparkbyexamples.com/pyspark/pyspark-window-functions/
# ^ Pyspark Window function

In [None]:
import findspark
import pyspark
from pyspark.sql import SparkSession, Row, Window
import pandas as pd
from collections import OrderedDict
import json, xmltodict
import pyspark.sql.functions as F

In [None]:
# Create new Sparkcontext instance for analysis
findspark.init()
sc = pyspark.SparkContext(appName="SMS")
spark = SparkSession.builder.getOrCreate()

In [None]:
# Import in the SMS XML file provided y the SMS Backup and Restore app
# Parse XML file into Dictionary

# Cols in Data: Protocol, Address (sender), date, type, subject, body, toa, service_center, read, 
# status, locked_ date_sent, sub_id, readable_date, contact_name

# All column names are prepended with an '@'

# Time to Run: 2 Minutes

with open(r"/Users/rajn/Library/CloudStorage/GoogleDrive-kinqraj@gmail.com/My Drive/Backup/SMS-12_24_22-NoMedia.xml",encoding="utf8") as xml_file:
    data_dict = xmltodict.parse(xml_file.read())
    
xml_file.close()

# Convert Dictionary into parsable JSON file

json_data = json.dumps(data_dict['smses']['sms'])

with open("sms_data.json", "w") as json_file:
        json_file.write(json_data)
        
json_file.close()

In [None]:
 
# Originally I used a for loop to convert the dict I had to a df, but it was extraordinarily slow (gave up after 1 hour of run)

#for key in data_dict['smses']['sms']:
#         newRow = spark.createDataFrame([(key['@contact_name'], key['@readable_date'], key['@body'])], columns)
#         sms_df.union(newRow)

In [None]:
# Convert JSON data into dataframe
# 44 Seconds

sms_df = spark.read.json("sms_data.json")

In [None]:
# Save space and remove uneeded / unknown cols
sms_df = sms_df.drop("@service_center","@status","@sub_id","@subject","@toa","@type", "@locked","@protocol", "@read", "@sc_toa")

# Rule defining sent_by_me column 
sent_by_me_col = F.when(F.col('@date_sent') == 0, 'Sent By Raj').otherwise('Sent By Contact')

# Add timestamp field to better group with
sms_df = sms_df.withColumn("@timestamp_sent", F.to_timestamp("@readable_date", 'MMM d, yyyy HH:mm:ss'))
sms_df = sms_df.withColumn("@timestamp_year", F.year("@timestamp_sent"))
sms_df = sms_df.withColumn("@timestamp_weekday", F.date_format(F.col("@timestamp_sent"), 'E'))
sms_df = sms_df.withColumn("@sent_by_me", sent_by_me_col)

In [None]:
sms_df.show(5)

In [None]:
sms_count_pivot_df = sms_df.groupBy('@contact_name').pivot('@timestamp_year').count()
sms_count_df = sms_df.groupBy('@contact_name', '@timestamp_year').count()
# sms_count_pivot_df.show(1000)
sms_count_df.show(1000)

In [None]:
# Use a window to determine, for each contact, which year I messaged them most
contactCountWindowSpec = Window.partitionBy("@contact_name").orderBy("count")

sms_window_contact_year_count_df = sms_count_df.withColumn("rank_year", F.row_number().over(contactCountWindowSpec)).show()


In [None]:
# Use a window to determine which contact + year combo I messaged most
countWindowSpec = Window.partitionBy().orderBy(F.col("count").desc())

sms_window_contact_year_count_df = sms_count_df.withColumn("rank_year", F.row_number().over(countWindowSpec)).show(50)


In [None]:
# Use a window to determine, for each year, who I messaged most
countWindowSpec = Window.partitionBy('@timestamp_year').orderBy(F.col("count").desc())

sms_window_contact_year_count_df = sms_count_df.withColumn("rank_year", F.row_number().over(countWindowSpec)).filter(F.col("rank_year") <= 10).show(60)

In [None]:
# IDEAS
# Percent of total messages for a given year
# Merge in FB texts by name match 

In [None]:
# Filter to an @date_sent of 0, which represents messages from me in this dataset, to find which year I sent the most messages

countWindowSpec = Window.partitionBy().orderBy(F.col('count').desc())

sms_count_pivot_df = sms_df.filter(sms_df['@date_sent'] == 0).groupBy('@timestamp_year').count()

sms_count_pivot_df.withColumn("rank_sends_year", F.row_number().over(countWindowSpec)).show()

In [None]:
# By person, did I send more messages or do they?

countWindowSpec = Window.partitionBy().orderBy(F.col('Sent By Raj').desc())

sms_count_df = sms_df.groupBy('@contact_name').pivot('@sent_by_me').count()

sms_count_df.withColumn("rank_sends_year", F.row_number().over(countWindowSpec)).show()

In [None]:
# By year, did I send more messages or do they?

countWindowSpec = Window.partitionBy().orderBy(F.col('Sent By Raj').desc())

sms_count_df = sms_df.filter(sms_df['@contact_name'] != '(unknown)').groupBy('@timestamp_year').pivot('@sent_by_me').count()

sms_count_df.withColumn("rank_sends_year", F.row_number().over(countWindowSpec)).show()

In [None]:
# By year AND contact, did I send more messages or do they?

countWindowSpec = Window.partitionBy().orderBy(F.col('Sent By Raj').desc())

sms_count_df = sms_df.filter(sms_df['@contact_name'] != '(unknown)').groupBy('@timestamp_year', '@contact_name').pivot('@sent_by_me').count()

sms_count_df.withColumn("rank_sends_year", F.row_number().over(countWindowSpec)).show()

In [None]:
# Who have I messaged the most overall, by year ranked by 2021


countWindowYearSpec2022 = Window.partitionBy().orderBy(F.col('2022').desc())
countWindowYearSpec2021 = Window.partitionBy().orderBy(F.col('2021').desc())
countWindowYearSpec2020 = Window.partitionBy().orderBy(F.col('2020').desc())
countWindowYearSpec2019 = Window.partitionBy().orderBy(F.col('2019').desc())
countWindowYearSpec2018 = Window.partitionBy().orderBy(F.col('2018').desc())

sms_df.groupBy('@contact_name').pivot('@timestamp_year').count()\
.withColumn('rank_2018', F.row_number().over(countWindowYearSpec2018))\
.withColumn('rank_2019', F.row_number().over(countWindowYearSpec2019))\
.withColumn('rank_2020', F.row_number().over(countWindowYearSpec2020))\
.withColumn('rank_2021', F.row_number().over(countWindowYearSpec2021))\
.withColumn('rank_2022', F.row_number().over(countWindowYearSpec2022))\
.show(30)

In [None]:
# Who have I messaged the most overall, by weekday ranked by Fri, in 2021

Year = 2022
# Weekdays: Mon, Tue, Wed, Thu, Fri, Sat, Sun
Weekday = 'Fri'

countWindowSpecDay = Window.partitionBy().orderBy(F.col(Weekday).desc())

sms_df.filter(F.col('@timestamp_year') == Year)\
.groupBy('@contact_name').pivot('@timestamp_weekday').count()\
.withColumn('rank_Day', F.row_number().over(countWindowSpecDay))\
.show(30)

In [None]:
# Any trend in what weekday I message most on generally? By percent of that year's messages

countWindowSpecDay = Window.partitionBy().orderBy(F.col(Weekday).desc())

sms_df.groupBy('@timestamp_weekday', '@timestamp_year')\
.count()\
.withColumn('percent_of_year', F.col('count')/F.sum('count').over(Window.partitionBy('@timestamp_year')) * 100)\
.orderBy([F.col('@timestamp_year'), F.col('percent_of_year').desc()])\
.show(50)