### 1. Setup the Spark Environment

In [1]:
import pandas as pd
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

### 2. Read SQL Script from local_txt_file

### PS: mainly the function is to count how many transactions for each CurrencyType

In [2]:
sql_file= open('/Users/AmandaJia/Desktop/MPS_Spark/sql_query', 'r')
sql_query = sql_file.readline()

### 3. Generate the Spark Dataframe

In [3]:
df = spark.read.format("csv").option("header", "true").load("file:/Users/AmandaJia/Desktop/MPS_Spark/mockdata.csv")

In [4]:
df.show()

+---------+--------+-------+--------+-----------+---------+--------+-------------+
|AssetType|customer|TradeId|AggGroup|tradingBook|StartDate| endDate|tradeCurrency|
+---------+--------+-------+--------+-----------+---------+--------+-------------+
|  IR_SWAP|      C0|     T0|      A0|        TB0| 20190807|20210322|          USD|
|  IR_SWAP|      C1|     T1|      A1|        TB0| 20190807|20191206|          AUD|
|  IR_SWAP|      C0|     T2|      A0|        TB0| 20190807|20210322|          USD|
|  IR_SWAP|      C0|     T3|      A0|        TB0| 20190807|20210322|          USD|
|  IR_SWAP|      C1|     T4|      A1|        TB0| 20190807|20240621|          AUD|
|  IR_SWAP|      C0|     T5|      A0|        TB0| 20190807|20210322|          USD|
|  IR_SWAP|      C1|     T6|      A1|        TB0| 20190807|20191001|          AUD|
|  IR_SWAP|      C1|     T7|      A1|        TB0| 20190807|20200629|          NZD|
|  IR_SWAP|      C1|     T8|      A1|        TB0| 20190807|20240411|          AUD|
|  I

### 4. Loading the SQL Script Using Spark

In [5]:
df.createOrReplaceTempView("transactions")
results = spark.sql(sql_query)

In [6]:
results.show()

+-------------+-------------+
|tradeCurrency|num_of_trades|
+-------------+-------------+
|          AUD|           46|
|          SGD|            3|
|          NZD|       251417|
|          NZD|           40|
|          CNY|       249613|
|          AUD|       249375|
|          USD|           11|
|          USD|       249795|
+-------------+-------------+



# Comparison of Spark_SQL & Pandas DataFrame

### Reading files

In [7]:
# Pandas_read
filenames = ['mockdata1.csv', 'mockdata2.csv', 'mockdata3.csv','mockdata4.csv']
pd_dfs = []
for filename in filenames:
    pd_dfs.append(pd.read_csv(filename))

In [8]:
# Spark_read
spark_dfs = []
for i in range(1,5):
    spark_dfs.append(spark.read.format("csv").option("header", "true").load("file:/Users/AmandaJia/Desktop/MPS_Spark/mockdata%s.csv"%(str(i))))

### Timing for the Pandas_dataframe

In [9]:
%%time
pd_dfs[0].groupby('tradeCurrency').count()[['customer']]

CPU times: user 473 ms, sys: 279 ms, total: 752 ms
Wall time: 864 ms


Unnamed: 0_level_0,customer
tradeCurrency,Unnamed: 1_level_1
AUD,250269
NZD,249205
SGD,250466
USD,250160


In [10]:
%%time
pd_dfs[1].groupby('tradeCurrency').count()[['customer']]

CPU times: user 845 ms, sys: 428 ms, total: 1.27 s
Wall time: 1.21 s


Unnamed: 0_level_0,customer
tradeCurrency,Unnamed: 1_level_1
AUD,499800
NZD,499852
SGD,500359
USD,500089


In [11]:
%%time
pd_dfs[2].groupby('tradeCurrency').count()[['customer']]

CPU times: user 1.06 s, sys: 514 ms, total: 1.57 s
Wall time: 1.71 s


Unnamed: 0_level_0,customer
tradeCurrency,Unnamed: 1_level_1
AUD,605710
NZD,605612
SGD,606615
USD,607063


In [12]:
%%time
pd_dfs[3].groupby('tradeCurrency').count()[['customer']]

CPU times: user 3.39 s, sys: 882 ms, total: 4.27 s
Wall time: 5.01 s


Unnamed: 0_level_0,customer
tradeCurrency,Unnamed: 1_level_1
AUD,1997628
NZD,2000061
SGD,2002125
USD,2000185


### Timing for Spark_SQL

In [13]:
%%time
spark_dfs[0].createOrReplaceTempView("transactions")
results = spark.sql(sql_query)
#results.show()

CPU times: user 1.63 ms, sys: 1.44 ms, total: 3.07 ms
Wall time: 42.4 ms


In [14]:
%%time
spark_dfs[1].createOrReplaceTempView("transactions")
results = spark.sql(sql_query)
#results.show()

CPU times: user 2.25 ms, sys: 2.01 ms, total: 4.26 ms
Wall time: 35.1 ms


In [15]:
%%time
spark_dfs[2].createOrReplaceTempView("transactions")
results = spark.sql(sql_query)
#results.show()

CPU times: user 1.5 ms, sys: 1.1 ms, total: 2.6 ms
Wall time: 35.7 ms


In [16]:
%%time
spark_dfs[3].createOrReplaceTempView("transactions")
results = spark.sql(sql_query)
#results.show()

CPU times: user 1.83 ms, sys: 1.29 ms, total: 3.12 ms
Wall time: 41.8 ms


## Past Comparison 

In [17]:
%%time
#df.createOrReplaceTempView("transactions")
results = spark.sql(sql_query)
#results.show()

CPU times: user 1.75 ms, sys: 1.47 ms, total: 3.22 ms
Wall time: 11.6 ms


In [18]:
import pandas as pd
df_pd = pd.read_csv('mockdata.csv')

In [19]:
%%time
df_pd.groupby('tradeCurrency').count()[['customer']]

CPU times: user 415 ms, sys: 43 ms, total: 458 ms
Wall time: 456 ms


Unnamed: 0_level_0,customer
tradeCurrency,Unnamed: 1_level_1
AUD,46
NZD,40
SGD,3
USD,11
AUD,249375
CNY,249613
NZD,251417
USD,249795
