In [83]:
import os
import sys
os.environ['SPARK_HOME']='/home/samar/spark'
os.environ['PYLIB']=os.environ['SPARK_HOME']+'/python/lib'
sys.path.insert(0,os.environ['PYLIB']+'/py4j-0.10.9-src.zip')
sys.path.insert(1,os.environ['PYLIB']+'/pyspark.zip')

In [84]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

In [85]:
spark = SparkSession.builder.appName('SparkOptimizations').getOrCreate()

In [86]:
# Having set the driver and driver options we should have spark representing spark session 
# available straight away
spark.version

'3.0.0'

In [87]:
from pyspark.sql.functions import *

In [88]:
# load the cash market data
cm_file_location = 'file:///home/samar/201819/cm'

In [89]:
# read the csv data
cm_df = spark.read.option(
    "inferSchema", True).option(
    "header", True).csv(cm_file_location)

In [90]:
# check it
cm_df.show(2)

+----------+------+-----+-----+-----+-----+-----+---------+---------+---------+-----------+-----------+------------+----+
|    SYMBOL|SERIES| OPEN| HIGH|  LOW|CLOSE| LAST|PREVCLOSE|TOTTRDQTY|TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|_c13|
+----------+------+-----+-----+-----+-----+-----+---------+---------+---------+-----------+-----------+------------+----+
| 20MICRONS|    EQ| 49.3| 49.5|45.45|46.65|46.95|     48.4|    87346|4139659.6|21-SEP-2018|        625|INE144J01027|null|
|21STCENMGM|    EQ|31.15|31.75|31.15|31.75|31.75|    31.75|      801|  25024.2|21-SEP-2018|         11|INE253B01015|null|
+----------+------+-----+-----+-----+-----+-----+---------+---------+---------+-----------+-----------+------------+----+
only showing top 2 rows



In [91]:
# drop the extra column
cmdf = cm_df.drop("_c13")
print("The cash market data frame schema")
cmdf.printSchema()

The cash market data frame schema
root
 |-- SYMBOL: string (nullable = true)
 |-- SERIES: string (nullable = true)
 |-- OPEN: double (nullable = true)
 |-- HIGH: double (nullable = true)
 |-- LOW: double (nullable = true)
 |-- CLOSE: double (nullable = true)
 |-- LAST: double (nullable = true)
 |-- PREVCLOSE: double (nullable = true)
 |-- TOTTRDQTY: integer (nullable = true)
 |-- TOTTRDVAL: double (nullable = true)
 |-- TIMESTAMP: string (nullable = true)
 |-- TOTALTRADES: integer (nullable = true)
 |-- ISIN: string (nullable = true)



In [92]:
# a function to replace month names with numbers
def mnameToNo(dt):
    mname = dt[3:6]
    calendar = {"JAN": "01", "FEB": "02", "MAR": "03", "APR": "04",
      "MAY": "05", "JUN": "06", "JUL": "07", "AUG": "08", "SEP": "09", "OCT": "10",
      "NOV": "11", "DEC": "12"}
    return dt.replace(mname, calendar[mname])

In [93]:
# verify the function is working
mnameToNo('12-JAN-2016')

'12-01-2016'

In [94]:
# wrap it in a udf
udf_mname_to_no = udf(mnameToNo)

In [95]:
# create a proper timestamp column for the cash market data
cmdfvw = cmdf.withColumn(
    "tsp",
    to_timestamp(udf_mname_to_no("TIMESTAMP"), "dd-MM-yyyy"))

print("verifying the extra column we added as a proper timestamp")
cmdfvw.limit(5).show()

verifying the extra column we added as a proper timestamp
+----------+------+-------+-------+-------+--------+-------+---------+---------+------------+-----------+-----------+------------+-------------------+
|    SYMBOL|SERIES|   OPEN|   HIGH|    LOW|   CLOSE|   LAST|PREVCLOSE|TOTTRDQTY|   TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|                tsp|
+----------+------+-------+-------+-------+--------+-------+---------+---------+------------+-----------+-----------+------------+-------------------+
| 20MICRONS|    EQ|   49.3|   49.5|  45.45|   46.65|  46.95|     48.4|    87346|   4139659.6|21-SEP-2018|        625|INE144J01027|2018-09-21 00:00:00|
|21STCENMGM|    EQ|  31.15|  31.75|  31.15|   31.75|  31.75|    31.75|      801|     25024.2|21-SEP-2018|         11|INE253B01015|2018-09-21 00:00:00|
|3IINFOTECH|    EQ|   3.85|    3.9|   3.45|    3.55|    3.6|     3.85|  5907575| 2.1564435E7|21-SEP-2018|       1876|INE748C01020|2018-09-21 00:00:00|
|   3MINDIA|    EQ|24699.5|24850.0|2

In [96]:
# save to a location as parquet
cmdf_parquet_save_location = 'file:///home/samar/cmdf_parquet'

In [97]:
# repartition - reduce
cmdfvw.write.mode('overwrite').save(cmdf_parquet_save_location)

In [98]:
# save the cash market dataframe as parquet
cmdf_parquet = spark.read.parquet(cmdf_parquet_save_location)

In [99]:
# the prive volume dataframe location
prvol_file_location = 'file:///home/samar/201819/prvolmod'

In [100]:
# the schema for the prive volume delivery dataframe
from pyspark.sql.types import *
prvolSchema = StructType(
    [
      StructField("rectype", StringType()),
      StructField("srno", IntegerType()),
      StructField("symbol", StringType()),
      StructField("series", StringType()),
      StructField("traded", IntegerType()),
      StructField("deliverable", IntegerType()),
      StructField("delper", DoubleType()),
      StructField("tsp", TimestampType())
    ]
)

In [101]:
# read the price volume dataframe using the schema
prvol_df = spark.read.schema(prvolSchema).\
option('dateformat', 'dd-mm-yyyy').csv(prvol_file_location)

In [102]:
# location ot save it as parquet so that we can load it directly later
prvol_df_parquet_location = 'file:///home/samar/prvol_df_parquet'

In [103]:
# write the price volume dataframe to parquet
prvol_df.write.mode('overwrite').parquet(prvol_df_parquet_location)

In [104]:
# read the prive volume data frame from disk as parquet
prvoldf_parquet = spark.read.parquet(prvol_df_parquet_location)

In [107]:
# join the two on symbol and timestamp and check the query plan
cmdf_parquet.join(prvoldf_parquet, ['symbol', 'tsp']).explain()

== Physical Plan ==
*(5) Project [SYMBOL#5552, tsp#5565, SERIES#5553, OPEN#5554, HIGH#5555, LOW#5556, CLOSE#5557, LAST#5558, PREVCLOSE#5559, TOTTRDQTY#5560, TOTTRDVAL#5561, TIMESTAMP#5562, TOTALTRADES#5563, ISIN#5564, rectype#5692, srno#5693, series#5695, traded#5696, deliverable#5697, delper#5698]
+- *(5) SortMergeJoin [SYMBOL#5552, tsp#5565], [symbol#5694, tsp#5699], Inner
   :- *(2) Sort [SYMBOL#5552 ASC NULLS FIRST, tsp#5565 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(SYMBOL#5552, tsp#5565, 200), true, [id=#1825]
   :     +- *(1) Project [SYMBOL#5552, SERIES#5553, OPEN#5554, HIGH#5555, LOW#5556, CLOSE#5557, LAST#5558, PREVCLOSE#5559, TOTTRDQTY#5560, TOTTRDVAL#5561, TIMESTAMP#5562, TOTALTRADES#5563, ISIN#5564, tsp#5565]
   :        +- *(1) Filter (isnotnull(SYMBOL#5552) AND isnotnull(tsp#5565))
   :           +- *(1) ColumnarToRow
   :              +- FileScan parquet [SYMBOL#5552,SERIES#5553,OPEN#5554,HIGH#5555,LOW#5556,CLOSE#5557,LAST#5558,PREVCLOSE#5559,TOTTRDQT

In [106]:
# execute the join
# check the suffle
# repeated executions should see the shuffle occuring every time
cmdf_parquet.join(prvoldf_parquet, ['symbol', 'tsp']).show(2)

+----------+-------------------+------+----+----+----+-----+----+---------+---------+------------+-----------+-----------+------------+-------+----+------+--------+-----------+------+
|    SYMBOL|                tsp|SERIES|OPEN|HIGH| LOW|CLOSE|LAST|PREVCLOSE|TOTTRDQTY|   TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|rectype|srno|series|  traded|deliverable|delper|
+----------+-------------------+------+----+----+----+-----+----+---------+---------+------------+-----------+-----------+------------+-------+----+------+--------+-----------+------+
|21STCENMGM|2019-10-24 00:00:00|    EQ|12.9|12.9|12.9| 12.9|12.9|     12.9|      400|      5160.0|24-OCT-2019|          4|INE253B01015|     20|   2|    EQ|     400|        400| 100.0|
|3IINFOTECH|2018-07-13 00:00:00|    EQ| 4.1| 4.3|3.65| 3.75| 3.7|     4.05| 15374953|6.03156685E7|13-JUL-2018|       8873|INE748C01020|     20|   3|    EQ|15374953|    7443771| 48.41|
+----------+-------------------+------+----+----+----+-----+----+---------+-----

In [108]:
# repartition into common number of partitions by the join columns and cache
cmdf_rep_cached = cmdf_parquet.repartition(4,['symbol', 'tsp']).cache()

In [109]:
# repartition the price volume dataframe into common number of partitions on join columns
prvoldf_rep_cached = prvoldf_parquet.repartition(4, ['symbol', 'tsp']).cache()

In [110]:
# join the two partitioined on join columns and cached dataframes
cmdf_rep_cached.join(prvoldf_rep_cached, ['symbol', 'tsp']).explain()

== Physical Plan ==
*(3) Project [SYMBOL#5552, tsp#5565, SERIES#5553, OPEN#5554, HIGH#5555, LOW#5556, CLOSE#5557, LAST#5558, PREVCLOSE#5559, TOTTRDQTY#5560, TOTTRDVAL#5561, TIMESTAMP#5562, TOTALTRADES#5563, ISIN#5564, rectype#5692, srno#5693, series#5695, traded#5696, deliverable#5697, delper#5698]
+- *(3) SortMergeJoin [SYMBOL#5552, tsp#5565], [symbol#5694, tsp#5699], Inner
   :- *(1) Sort [SYMBOL#5552 ASC NULLS FIRST, tsp#5565 ASC NULLS FIRST], false, 0
   :  +- *(1) Filter (isnotnull(SYMBOL#5552) AND isnotnull(tsp#5565))
   :     +- InMemoryTableScan [SYMBOL#5552, SERIES#5553, OPEN#5554, HIGH#5555, LOW#5556, CLOSE#5557, LAST#5558, PREVCLOSE#5559, TOTTRDQTY#5560, TOTTRDVAL#5561, TIMESTAMP#5562, TOTALTRADES#5563, ISIN#5564, tsp#5565], [isnotnull(SYMBOL#5552), isnotnull(tsp#5565)]
   :           +- InMemoryRelation [SYMBOL#5552, SERIES#5553, OPEN#5554, HIGH#5555, LOW#5556, CLOSE#5557, LAST#5558, PREVCLOSE#5559, TOTTRDQTY#5560, TOTTRDVAL#5561, TIMESTAMP#5562, TOTALTRADES#5563, ISIN#5564

In [21]:
# check repeated executions. after the first execution will see 0 shuffle
cmdf_rep_cached.join(prvoldf_rep_cached, ['symbol', 'tsp']).show()

+----------+-------------------+------+-----+-----+-----+-----+-----+---------+---------+---------+-----------+-----------+------------+-------+----+------+------+-----------+------+
|    SYMBOL|                tsp|SERIES| OPEN| HIGH|  LOW|CLOSE| LAST|PREVCLOSE|TOTTRDQTY|TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|rectype|srno|series|traded|deliverable|delper|
+----------+-------------------+------+-----+-----+-----+-----+-----+---------+---------+---------+-----------+-----------+------------+-------+----+------+------+-----------+------+
| 20MICRONS|2019-01-14 00:00:00|    EQ| 42.7| 42.7|41.25|41.85|41.95|    41.85|    15544|649079.05|14-JAN-2019|        243|INE144J01027|     20|   2|    EQ| 15544|       8370| 53.85|
| 20MICRONS|2019-07-08 00:00:00|    EQ|37.45| 40.1| 35.3| 36.1| 36.5|     37.3|    49705|1876631.9|08-JUL-2019|        548|INE144J01027|     20|   2|    EQ| 49705|      18911| 38.05|
| 20MICRONS|2019-07-11 00:00:00|    EQ|37.35| 37.7|36.05|36.35| 36.8|    35.95|    18

In [23]:
# verify that is is on account of partitioning and caching and not just caching
cmdf_cached = cmdf_parquet.select('*').cache()

In [24]:
print(cmdf_parquet.storageLevel)
print(cmdf_cached.storageLevel)

Serialized 1x Replicated
Disk Memory Deserialized 1x Replicated


In [25]:
prvoldf_cached = prvoldf_parquet.select('*').cache()

In [26]:
print(prvoldf_parquet.storageLevel)
print(prvoldf_cached.storageLevel)

Serialized 1x Replicated
Disk Memory Deserialized 1x Replicated


In [29]:
cmdf_cached.join(prvoldf_cached, ['symbol', 'tsp']).show(2)

+----------+-------------------+------+----+----+----+-----+----+---------+---------+------------+-----------+-----------+------------+-------+----+------+--------+-----------+------+
|    SYMBOL|                tsp|SERIES|OPEN|HIGH| LOW|CLOSE|LAST|PREVCLOSE|TOTTRDQTY|   TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|rectype|srno|series|  traded|deliverable|delper|
+----------+-------------------+------+----+----+----+-----+----+---------+---------+------------+-----------+-----------+------------+-------+----+------+--------+-----------+------+
|21STCENMGM|2019-10-24 00:00:00|    EQ|12.9|12.9|12.9| 12.9|12.9|     12.9|      400|      5160.0|24-OCT-2019|          4|INE253B01015|     20|   2|    EQ|     400|        400| 100.0|
|3IINFOTECH|2018-07-13 00:00:00|    EQ| 4.1| 4.3|3.65| 3.75| 3.7|     4.05| 15374953|6.03156685E7|13-JUL-2018|       8873|INE748C01020|     20|   3|    EQ|15374953|    7443771| 48.41|
+----------+-------------------+------+----+----+----+-----+----+---------+-----

In [30]:
# grouping and aggregation operations also will benefit
cmdf_rep_cached.groupBy('symbol', 'tsp').agg(sum('tottrdqty').alias('tottrdqty')).explain()

== Physical Plan ==
*(1) HashAggregate(keys=[symbol#0, tsp#13], functions=[sum(cast(tottrdqty#8 as bigint))])
+- *(1) HashAggregate(keys=[symbol#0, tsp#13], functions=[partial_sum(cast(tottrdqty#8 as bigint))])
   +- InMemoryTableScan [SYMBOL#0, TOTTRDQTY#8, tsp#13]
         +- InMemoryRelation [SYMBOL#0, SERIES#1, OPEN#2, HIGH#3, LOW#4, CLOSE#5, LAST#6, PREVCLOSE#7, TOTTRDQTY#8, TOTTRDVAL#9, TIMESTAMP#10, TOTALTRADES#11, ISIN#12, tsp#13], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- Exchange hashpartitioning(symbol#0, tsp#13, 4), false, [id=#83]
                  +- *(1) ColumnarToRow
                     +- FileScan parquet [SYMBOL#0,SERIES#1,OPEN#2,HIGH#3,LOW#4,CLOSE#5,LAST#6,PREVCLOSE#7,TOTTRDQTY#8,TOTTRDVAL#9,TIMESTAMP#10,TOTALTRADES#11,ISIN#12,tsp#13] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/D:/tmp/cmdf_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<SYMBOL:string,SERIES:string,OPEN:double,H

In [35]:
# execute
cmdf_rep_cached.groupBy('symbol', 'tsp').agg(sum('tottrdqty').alias('tottrdqty')).show(2)

875309

In [37]:
# check for unpartitioned, uncached table
cmdf_parquet.groupBy('symbol', 'tsp').agg(sum('tottrdqty').alias('tottrdqty')).show(2)

+---------+-------------------+---------+
|   symbol|                tsp|tottrdqty|
+---------+-------------------+---------+
|SOLARINDS|2019-03-15 00:00:00|     2748|
|DIAMONDYD|2019-12-30 00:00:00|     2055|
+---------+-------------------+---------+
only showing top 2 rows



In [62]:
# save the cash market data table as a buckedted table
cmdf_parquet.write.bucketBy(4,'symbol', 'tsp').sortBy('symbol', 'tsp').mode('overwrite').saveAsTable('cmdfbkt')

In [47]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|  cmdfbkt|      false|
+--------+---------+-----------+



In [63]:
# read back the bucketed table
cmdf_bkt = spark.read.table('cmdfbkt')

In [64]:
# check grouping, aggregation plan
# exchange hash partitioning is gone
cmdf_bkt.groupBy('symbol', 'tsp').agg(sum('tottrdqty').alias('totqty')).explain()

== Physical Plan ==
*(1) HashAggregate(keys=[symbol#4229, tsp#4242], functions=[sum(cast(tottrdqty#4237 as bigint))])
+- *(1) HashAggregate(keys=[symbol#4229, tsp#4242], functions=[partial_sum(cast(tottrdqty#4237 as bigint))])
   +- *(1) ColumnarToRow
      +- FileScan parquet default.cmdfbkt[SYMBOL#4229,TOTTRDQTY#4237,tsp#4242] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/SAMAR/OneDrive/spark_python/ntbk/spark-warehouse/cmdfbkt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<SYMBOL:string,TOTTRDQTY:int,tsp:timestamp>, SelectedBucketsCount: 200 out of 200




In [115]:
# create a small dataframe to check out broadcast hash join
# average delivery percentage for stocks across the two years
prvoldf_avg = prvoldf_parquet.filter("series='EQ'").groupBy('symbol').agg(avg('delper').alias('delper'))

In [117]:
# check the dataframe
prvoldf_avg.show(2)

+----------+-----------------+
|    symbol|           delper|
+----------+-----------------+
|  GODREJCP| 56.7804674796748|
|HITECHCORP|73.43516326530613|
+----------+-----------------+
only showing top 2 rows



In [119]:
# join cash market dataframe with average del percent dataframe and check the query plan
cmdf_parquet.join(prvoldf_avg, 'symbol').explain()

== Physical Plan ==
*(3) Project [SYMBOL#5552, SERIES#5553, OPEN#5554, HIGH#5555, LOW#5556, CLOSE#5557, LAST#5558, PREVCLOSE#5559, TOTTRDQTY#5560, TOTTRDVAL#5561, TIMESTAMP#5562, TOTALTRADES#5563, ISIN#5564, tsp#5565, delper#6298]
+- *(3) BroadcastHashJoin [SYMBOL#5552], [symbol#5694], Inner, BuildRight
   :- *(3) Project [SYMBOL#5552, SERIES#5553, OPEN#5554, HIGH#5555, LOW#5556, CLOSE#5557, LAST#5558, PREVCLOSE#5559, TOTTRDQTY#5560, TOTTRDVAL#5561, TIMESTAMP#5562, TOTALTRADES#5563, ISIN#5564, tsp#5565]
   :  +- *(3) Filter isnotnull(SYMBOL#5552)
   :     +- *(3) ColumnarToRow
   :        +- FileScan parquet [SYMBOL#5552,SERIES#5553,OPEN#5554,HIGH#5555,LOW#5556,CLOSE#5557,LAST#5558,PREVCLOSE#5559,TOTTRDQTY#5560,TOTTRDVAL#5561,TIMESTAMP#5562,TOTALTRADES#5563,ISIN#5564,tsp#5565] Batched: true, DataFilters: [isnotnull(SYMBOL#5552)], Format: Parquet, Location: InMemoryFileIndex[file:/D:/tmp/cmdf_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(SYMBOL)], ReadSchema: struct<SYMBOL:s

In [120]:
# execute and verify in the ui - no shuffle, straight broadcast
cmdf_parquet.join(prvoldf_avg, 'symbol').show()

+----------+------+-------+-------+-------+--------+-------+---------+---------+---------------+-----------+-----------+------------+-------------------+------------------+
|    SYMBOL|SERIES|   OPEN|   HIGH|    LOW|   CLOSE|   LAST|PREVCLOSE|TOTTRDQTY|      TOTTRDVAL|  TIMESTAMP|TOTALTRADES|        ISIN|                tsp|            delper|
+----------+------+-------+-------+-------+--------+-------+---------+---------+---------------+-----------+-----------+------------+-------------------+------------------+
| 20MICRONS|    EQ|   34.0|  34.55|   32.5|   32.95|   32.7|    34.15|    20773|       694054.3|21-AUG-2019|        269|INE144J01027|2019-08-21 00:00:00|           64.5408|
|3IINFOTECH|    EQ|   1.95|    2.0|    1.9|    1.95|   1.95|      1.9|   680794|     1312807.65|21-AUG-2019|        422|INE748C01020|2019-08-21 00:00:00|52.961686991869925|
|   3MINDIA|    EQ|20848.0|20848.0|20300.0|20315.85|20300.0|  20614.4|      732|   1.49590743E7|21-AUG-2019|        340|INE470A01017|20