In [0]:
from pyspark import SparkContext

In [0]:
shares_rdd = sc.textFile('/FileStore/tables/sharemarket.csv')

In [0]:
shares_rdd.take(5)

Out[3]: ['N,N1,IRFC,BOND 8.00% PA TAX FREE S1,1086,1085,1085,1084.54,1085,3371959.44,3108, ,8,1194,1050',
 'N,N1,JNPT,BOND 6.82% PA TAX FREE S1,1001,1007.1,1015,1007.1,1015,51370,51, ,2,1529.99,1000',
 'N,N1,NHAI,BOND 8.20% PA TAX FREE S1,1091.38,1092,1092,1087,1089.98,3861145.49,3544, ,39,1140,1045',
 'N,N1,NTPC,8.41%S-R-NCD SERIES 1A,1198.99,1135,1135,1135,1135,113500,100, ,1,1275,1022.1',
 'N,N1,RECLTD,TAXFREE SEC NCD TR1 S1,1083.54,1088,1088,1085,1085,108530,100, ,2,1184.8,1041']

In [0]:
header = 'MARKET,SERIES,SYMBOL,SECURITY,PREV_CL_PR,OPEN_PRICE,HIGH_PRICE,LOW_PRICE,CLOSE_PRICE,NET_TRDVAL,NET_TRDQTY,CORP_IND,TRADES,HI_52_WK,LO_52_WK'

In [0]:
shares_rdd = shares_rdd.map(lambda x : x.split(','))

In [0]:
shares_rdd.take(5)

Out[6]: [['N',
  'N1',
  'IRFC',
  'BOND 8.00% PA TAX FREE S1',
  '1086',
  '1085',
  '1085',
  '1084.54',
  '1085',
  '3371959.44',
  '3108',
  ' ',
  '8',
  '1194',
  '1050'],
 ['N',
  'N1',
  'JNPT',
  'BOND 6.82% PA TAX FREE S1',
  '1001',
  '1007.1',
  '1015',
  '1007.1',
  '1015',
  '51370',
  '51',
  ' ',
  '2',
  '1529.99',
  '1000'],
 ['N',
  'N1',
  'NHAI',
  'BOND 8.20% PA TAX FREE S1',
  '1091.38',
  '1092',
  '1092',
  '1087',
  '1089.98',
  '3861145.49',
  '3544',
  ' ',
  '39',
  '1140',
  '1045'],
 ['N',
  'N1',
  'NTPC',
  '8.41%S-R-NCD SERIES 1A',
  '1198.99',
  '1135',
  '1135',
  '1135',
  '1135',
  '113500',
  '100',
  ' ',
  '1',
  '1275',
  '1022.1'],
 ['N',
  'N1',
  'RECLTD',
  'TAXFREE SEC NCD TR1 S1',
  '1083.54',
  '1088',
  '1088',
  '1085',
  '1085',
  '108530',
  '100',
  ' ',
  '2',
  '1184.8',
  '1041']]

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

schema = StructType([ \
    StructField("MARKET",StringType(),True), \
    StructField("SERIES",StringType(),True), \
    StructField("SYMBOL",StringType(),True), \
    StructField("SECURITY", StringType(), True), \
    StructField("PREV_CL_PR", StringType(), True), \
    StructField("OPEN_PRICE", StringType(), True), \
    StructField("HIGH_PRICE", StringType(), True), \
    StructField("LOW_PRICE", StringType(), True), \
    StructField("CLOSE_PRICE", StringType(), True), \
    StructField("NET_TRDVAL", StringType(), True) ,\
    StructField("NET_TRDQTY", StringType(), True) ,\
    StructField("CORP_IND", StringType(), True), \
    StructField("TRADES", StringType(), True), \
    StructField("HI_52_WK", StringType(), True), \
    StructField("LO_52_WK", StringType(), True) \
  ])

In [0]:
shares_df = spark.createDataFrame(shares_rdd, schema)

In [0]:
shares_df.show(5)

+------+------+------+--------------------+----------+----------+----------+---------+-----------+----------+----------+--------+------+--------+--------+
|MARKET|SERIES|SYMBOL|            SECURITY|PREV_CL_PR|OPEN_PRICE|HIGH_PRICE|LOW_PRICE|CLOSE_PRICE|NET_TRDVAL|NET_TRDQTY|CORP_IND|TRADES|HI_52_WK|LO_52_WK|
+------+------+------+--------------------+----------+----------+----------+---------+-----------+----------+----------+--------+------+--------+--------+
|     N|    N1|  IRFC|BOND 8.00% PA TAX...|      1086|      1085|      1085|  1084.54|       1085|3371959.44|      3108|        |     8|    1194|    1050|
|     N|    N1|  JNPT|BOND 6.82% PA TAX...|      1001|    1007.1|      1015|   1007.1|       1015|     51370|        51|        |     2| 1529.99|    1000|
|     N|    N1|  NHAI|BOND 8.20% PA TAX...|   1091.38|      1092|      1092|     1087|    1089.98|3861145.49|      3544|        |    39|    1140|    1045|
|     N|    N1|  NTPC|8.41%S-R-NCD SERI...|   1198.99|      1135|     

In [0]:
shares_df.createOrReplaceTempView('ShareMarket')

In [0]:
spark.sql('desc ShareMarket').show()

+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|     MARKET|   string|   null|
|     SERIES|   string|   null|
|     SYMBOL|   string|   null|
|   SECURITY|   string|   null|
| PREV_CL_PR|   string|   null|
| OPEN_PRICE|   string|   null|
| HIGH_PRICE|   string|   null|
|  LOW_PRICE|   string|   null|
|CLOSE_PRICE|   string|   null|
| NET_TRDVAL|   string|   null|
| NET_TRDQTY|   string|   null|
|   CORP_IND|   string|   null|
|     TRADES|   string|   null|
|   HI_52_WK|   string|   null|
|   LO_52_WK|   string|   null|
+-----------+---------+-------+



In [0]:
spark.sql('Select * from ShareMarket limit 5').show()

+------+------+------+--------------------+----------+----------+----------+---------+-----------+----------+----------+--------+------+--------+--------+
|MARKET|SERIES|SYMBOL|            SECURITY|PREV_CL_PR|OPEN_PRICE|HIGH_PRICE|LOW_PRICE|CLOSE_PRICE|NET_TRDVAL|NET_TRDQTY|CORP_IND|TRADES|HI_52_WK|LO_52_WK|
+------+------+------+--------------------+----------+----------+----------+---------+-----------+----------+----------+--------+------+--------+--------+
|     N|    N1|  IRFC|BOND 8.00% PA TAX...|      1086|      1085|      1085|  1084.54|       1085|3371959.44|      3108|        |     8|    1194|    1050|
|     N|    N1|  JNPT|BOND 6.82% PA TAX...|      1001|    1007.1|      1015|   1007.1|       1015|     51370|        51|        |     2| 1529.99|    1000|
|     N|    N1|  NHAI|BOND 8.20% PA TAX...|   1091.38|      1092|      1092|     1087|    1089.98|3861145.49|      3544|        |    39|    1140|    1045|
|     N|    N1|  NTPC|8.41%S-R-NCD SERI...|   1198.99|      1135|     

In [0]:
# 1.Query to display the number of series present in the data.

result1 = spark.sql('Select count(distinct series) as Total_Count from ShareMarket')
result1.show()
result1.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output1.txt")

+-----------+
|Total_Count|
+-----------+
|         53|
+-----------+



In [0]:
# 2.Display the series present in the data.(using hive)

result2 = spark.sql('Select Distinct(Series) from ShareMarket')
result2.show()
result2.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output2.txt")

+------+
|Series|
+------+
|    NA|
|    N3|
|    NC|
|    N7|
|    N8|
|    N2|
|    N5|
|    N4|
|    N6|
|    N1|
|    N9|
|    NB|
|    NS|
|    NL|
|    NK|
|    NJ|
|    NX|
|    NW|
|    NP|
|    NH|
+------+
only showing top 20 rows



In [0]:
# 3.Find the sumpof all the prices in the each series.(Using hive)

result3 = spark.sql('Select series, sum(prev_cl_pr), sum(open_price), sum(high_price), sum(low_price), sum(close_price) from ShareMarket GROUP BY series')
result3.show()
result3.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output3.txt")

+------+------------------+------------------+------------------+------------------+-----------------+
|series|   sum(prev_cl_pr)|   sum(open_price)|   sum(high_price)|    sum(low_price)| sum(close_price)|
+------+------------------+------------------+------------------+------------------+-----------------+
|    NA|          19861.54|           19748.0|20098.340000000004|           19723.0|         20059.57|
|    N3|           2092.27|           2086.24|2088.6000000000004|            2085.9|          2087.38|
|    NC|          14393.07|          14319.21|           14391.7|           14318.5|          14382.9|
|    N7|           3218.36|           3232.41|           3290.55|           3232.41|          3290.53|
|    N8|            8170.6|           8148.98|           8175.86|           8127.61|8130.780000000001|
|    N2|10527.279999999999|          10518.07|           10528.4|10491.460000000001|         10510.92|
|    N5|           8853.06|           8720.56|            8850.5|        

In [0]:
# 4.Display security,series with highest net trade value(use pyspark)

result4 = spark.sql('Select series, security, net_trdval from ShareMarket where net_trdval = (Select Max(net_trdval) from ShareMarket)')
result4.show(truncate=False)
result4.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output4.txt")

+------+-------------------------+----------+
|series|security                 |net_trdval|
+------+-------------------------+----------+
|N4    |SEC RED NCD 9.75 % SR.III|972888.7  |
+------+-------------------------+----------+



In [0]:
# 5.Display the series whose sum of all prices greater than the net trade value.(Using pyspark)

result5 = spark.sql("select SERIES, round(PREV_CL_PR + OPEN_PRICE + HIGH_PRICE + LOW_PRICE + CLOSE_PRICE) as Total_price, NET_TRDVAL from ShareMarket where (PREV_CL_PR + OPEN_PRICE + HIGH_PRICE + LOW_PRICE + CLOSE_PRICE) > NET_TRDVAL ")
result5.show()
result5.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output5.txt")

+------+-----------+----------+
|SERIES|Total_price|NET_TRDVAL|
+------+-----------+----------+
|    N2|     6103.0|      3666|
|    N2|     5683.0|      4548|
|    N4|     5109.0|   2044.02|
|    N4|     5159.0|    4124.2|
|    N5|     4825.0|    1912.4|
|    N6|     6700.0|      1341|
|    N9|    26340.0|     21060|
|    NB|     4609.0|      1843|
|    Y7|     1555.0|      1500|
+------+-----------+----------+



In [0]:
# 6.Display the series with highest net trade quantity.(Using pyspark)
result6 = spark.sql('SELECT series, net_trdqty from ShareMarket Where net_trdqty = (Select max(net_trdqty) from ShareMarket)')
result6.show()
result6.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output6.txt")

+------+----------+
|series|net_trdqty|
+------+----------+
|    NH|       989|
+------+----------+



In [0]:
# 7. Display the highest and lowest open price(Using sql)
result7 = spark.sql('Select max(open_price), min(open_price) from ShareMarket')
result7.show()
result7.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output7.txt")

+---------------+---------------+
|max(open_price)|min(open_price)|
+---------------+---------------+
|         998.99|           1000|
+---------------+---------------+



In [0]:
# 8.Query to display the series which have trades more than 80.(Using SQL).
result8 = spark.sql('Select series,trades from ShareMarket where trades > 80')
result8.show()
result8.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output8.txt")

+------+------+
|series|trades|
+------+------+
|    N2|   107|
|    N3|   787|
|    N7|   106|
+------+------+



In [0]:
# 9.Display the difference between the net trade value net trade quantity for each series.(Using sql).
result9 = spark.sql('Select series, sum(round(net_trdval - net_trdqty)) as diff_net_trd_and_qty from ShareMarket Group By Series')
result9.show()
result9.coalesce(1).write.format("csv").option("header", "true").mode("append").save("/FileStore/tables/output9.txt")

+------+--------------------+
|series|diff_net_trd_and_qty|
+------+--------------------+
|    NA|           3331954.0|
|    N3|           5894946.0|
|    NC|           2505038.0|
|    N7|           2218463.0|
|    N8|           3982021.0|
|    N2|           3674572.0|
|    N5|           5581855.0|
|    N4|           3474030.0|
|    N6|         1.6785783E7|
|    N1|           7499601.0|
|    N9|            455371.0|
|    NB|           2430892.0|
|    YH|             61270.0|
|    NS|            354014.0|
|    NL|           1581526.0|
|    NK|           1010122.0|
|    Z4|             36957.0|
|    NJ|           7421504.0|
|    NX|             12729.0|
|    YS|             79055.0|
+------+--------------------+
only showing top 20 rows

