In [2]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType

In [3]:
schema = StructType() \
  .add("trading_date_time",DateType(),True) \
  .add("network",StringType(),True) \
  .add("message_category",StringType(),True) \
  .add("message_type",IntegerType(),True) \
  .add("message_sequence",StringType(),True) \
  .add("market_exchange",StringType(),True) \
  .add("symbol",StringType(),True) \
  .add("trade_price",DoubleType(),True) \
  .add("trade_size",IntegerType(),True) \
  .add("trade_conditions",StringType(),True) \
  .add("trade_conditions2",StringType(),True)


In [10]:
jdbcDF = spark \
    .read \
    .option("header", False) \
    .schema(schema) \
    .csv("gs://dcd-bucket-jerf/dcd/data_trades/")


jdbcDF


DataFrame[trading_date_time: date, network: string, message_category: string, message_type: int, message_sequence: string, market_exchange: string, symbol: string, trade_price: double, trade_size: int, trade_conditions: string, trade_conditions2: string]

In [6]:
jdbcDF.printSchema()

root
 |-- trading_date_time: date (nullable = true)
 |-- network: string (nullable = true)
 |-- message_category: string (nullable = true)
 |-- message_type: integer (nullable = true)
 |-- message_sequence: string (nullable = true)
 |-- market_exchange: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- trade_price: double (nullable = true)
 |-- trade_size: integer (nullable = true)
 |-- trade_conditions: string (nullable = true)
 |-- trade_conditions2: string (nullable = true)



In [11]:
jdbcDF.count()

                                                                                

192220

In [12]:
jdbcDF.show()

                                                                                

+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+
|trading_date_time|network|message_category|message_type|message_sequence|market_exchange|symbol|trade_price|trade_size|trade_conditions|trade_conditions2|
+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+
|       2012-08-21|      E|               I|        null|       000000268|              P|   BMI|      190.0|       100|               T|                 |
|       2012-08-21|      E|               B|        null|       000000269|              P|   BMI|      190.0|        50|              TI|                 |
|       2012-08-21|      E|               B|        null|       000000270|              P|   BMI|     189.92|        50|              TI|                 |
|       2012-08-21|      E|               B|        null|       

In [13]:
jdbcDF.toPandas()

                                                                                

Unnamed: 0,trading_date_time,network,message_category,message_type,message_sequence,market_exchange,symbol,trade_price,trade_size,trade_conditions,trade_conditions2
0,2012-08-21,E,I,,000000268,P,BMI,190.0000,100,T,
1,2012-08-21,E,B,,000000269,P,BMI,190.0000,50,TI,
2,2012-08-21,E,B,,000000270,P,BMI,189.9200,50,TI,
3,2012-08-21,E,B,,000000392,P,BMI,190.6700,100,FT,
4,2012-08-21,E,I,,000002163,P,BMI,190.4300,600,T,
...,...,...,...,...,...,...,...,...,...,...,...
192215,2012-08-21,E,B,,001248752,D,TAC,107.9700,5,TI,T
192216,2012-08-21,E,B,,001248845,D,TAC,108.2595,685000,TB,T
192217,2012-08-21,E,B,,001248856,D,TAC,108.0028,76000,TB,T
192218,2012-08-21,E,I,,001249003,P,TAC,108.0500,100,T,


In [15]:
jdbcDF.take(5)

[Row(trading_date_time=datetime.date(2012, 8, 21), network='E', message_category='I', message_type=None, message_sequence='000000268', market_exchange='P', symbol='BMI', trade_price=190.0, trade_size=100, trade_conditions='T', trade_conditions2=' '),
 Row(trading_date_time=datetime.date(2012, 8, 21), network='E', message_category='B', message_type=None, message_sequence='000000269', market_exchange='P', symbol='BMI', trade_price=190.0, trade_size=50, trade_conditions='TI', trade_conditions2=' '),
 Row(trading_date_time=datetime.date(2012, 8, 21), network='E', message_category='B', message_type=None, message_sequence='000000270', market_exchange='P', symbol='BMI', trade_price=189.92, trade_size=50, trade_conditions='TI', trade_conditions2=' '),
 Row(trading_date_time=datetime.date(2012, 8, 21), network='E', message_category='B', message_type=None, message_sequence='000000392', market_exchange='P', symbol='BMI', trade_price=190.67, trade_size=100, trade_conditions='FT', trade_conditions2

In [16]:
jdbcDF.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
network,192220,,,E,T
message_category,192220,,,A,W
message_type,0,,,,
message_sequence,192220,631829.217006555,433342.7226626963,000000268,01970506
market_exchange,192220,,,A,Z
symbol,192220,,,AVVB,TMSF
trade_price,192220,107.5473957413324,136.82105324874402,19.32,585.7
trade_size,192220,221.51110186244927,7573.449432427936,1,2141076
trade_conditions,192220,5.0,1.044465935734187,4,ZI


In [17]:
jdbcDF.describe(['trade_price']).show()



+-------+------------------+
|summary|       trade_price|
+-------+------------------+
|  count|            192220|
|   mean| 107.5473957413324|
| stddev|136.82105324874402|
|    min|             19.32|
|    max|             585.7|
+-------+------------------+



                                                                                

In [20]:
jdbcDF.orderBy(["trading_date_time", "symbol"], ascending=[1, 0]).take(10)


                                                                                

[Row(trading_date_time=datetime.date(2012, 8, 21), network='T', message_category='W', message_type=None, message_sequence='00000938', market_exchange='Q', symbol='TMSF', trade_price=44.97, trade_size=41, trade_conditions='@TI', trade_conditions2=None),
 Row(trading_date_time=datetime.date(2012, 8, 21), network='T', message_category='W', message_type=None, message_sequence='00000907', market_exchange='Q', symbol='TMSF', trade_price=44.98, trade_size=66, trade_conditions='@TI', trade_conditions2=None),
 Row(trading_date_time=datetime.date(2012, 8, 21), network='T', message_category='A', message_type=None, message_sequence='00000937', market_exchange='Q', symbol='TMSF', trade_price=44.97, trade_size=300, trade_conditions='T', trade_conditions2=None),
 Row(trading_date_time=datetime.date(2012, 8, 21), network='T', message_category='W', message_type=None, message_sequence='00000891', market_exchange='P', symbol='TMSF', trade_price=44.93, trade_size=34, trade_conditions='@FTI', trade_conditi

In [21]:
jdbcDF.groupBy("trading_date_time", "symbol").count().sort("trading_date_time", "symbol", ascending=[1, 0]).show()




+-----------------+------+-----+
|trading_date_time|symbol|count|
+-----------------+------+-----+
|       2012-08-21|  TMSF|80734|
|       2012-08-21|   TAC|21790|
|       2012-08-21|   RAD| 3816|
|       2012-08-21|  MOOT|13086|
|       2012-08-21|  HMAT| 7868|
|       2012-08-21|  GRIN| 2794|
|       2012-08-21|   BMI|21871|
|       2012-08-21|  AVVB|40261|
+-----------------+------+-----+



                                                                                

In [22]:
from pyspark.sql.functions import when
df2 = jdbcDF.select("symbol","trade_price","trade_size","market_exchange" \
      ,when(jdbcDF.market_exchange == "K","KALL") \
      .when(jdbcDF.market_exchange == "P","PAll") \
      .when(jdbcDF.market_exchange.isNull() ,"NO ESPECIFICADO") \
      .otherwise(jdbcDF.market_exchange).alias("market"))
df2.show()


+------+-----------+----------+---------------+------+
|symbol|trade_price|trade_size|market_exchange|market|
+------+-----------+----------+---------------+------+
|   BMI|      190.0|       100|              P|  PAll|
|   BMI|      190.0|        50|              P|  PAll|
|   BMI|     189.92|        50|              P|  PAll|
|   BMI|     190.67|       100|              P|  PAll|
|   BMI|     190.43|       600|              P|  PAll|
|   BMI|     190.43|        42|              P|  PAll|
|   BMI|     190.68|        40|              T|     T|
|   BMI|     190.67|        55|              P|  PAll|
|   BMI|     190.69|       105|              P|  PAll|
|   BMI|     190.69|       300|              P|  PAll|
|   BMI|     190.69|       100|              P|  PAll|
|   BMI|     190.69|        95|              P|  PAll|
|   BMI|     190.75|        75|              P|  PAll|
|   BMI|      190.9|       100|              T|     T|
|   BMI|     190.94|       200|              T|     T|
|   BMI|  

In [23]:
jdbcDF.selectExpr("*","trade_size * 10 AS trade_size10", "abs(trade_size) AS trade_size_abs").show()


+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+------------+--------------+
|trading_date_time|network|message_category|message_type|message_sequence|market_exchange|symbol|trade_price|trade_size|trade_conditions|trade_conditions2|trade_size10|trade_size_abs|
+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+------------+--------------+
|       2012-08-21|      E|               I|        null|       000000268|              P|   BMI|      190.0|       100|               T|                 |        1000|           100|
|       2012-08-21|      E|               B|        null|       000000269|              P|   BMI|      190.0|        50|              TI|                 |         500|            50|
|       2012-08-21|      E|               B|        null|       000000270|      

In [24]:
jdbcDF.where(jdbcDF.market_exchange == "T").show()


+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+
|trading_date_time|network|message_category|message_type|message_sequence|market_exchange|symbol|trade_price|trade_size|trade_conditions|trade_conditions2|
+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+
|       2012-08-21|      E|               B|        null|       000004752|              T|   BMI|     190.68|        40|             FTI|                 |
|       2012-08-21|      E|               I|        null|       000005122|              T|   BMI|      190.9|       100|               T|                 |
|       2012-08-21|      E|               I|        null|       000005123|              T|   BMI|     190.94|       200|               T|                 |
|       2012-08-21|      E|               I|        null|       

In [25]:
jdbcDF.where("market_exchange == 'T' AND trade_price = 190.9").show()


[Stage 29:>                                                         (0 + 1) / 1]

+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+
|trading_date_time|network|message_category|message_type|message_sequence|market_exchange|symbol|trade_price|trade_size|trade_conditions|trade_conditions2|
+-----------------+-------+----------------+------------+----------------+---------------+------+-----------+----------+----------------+-----------------+
|       2012-08-21|      E|               I|        null|       000005122|              T|   BMI|      190.9|       100|               T|                 |
|       2012-08-21|      E|               I|        null|       000005190|              T|   BMI|      190.9|       100|               @|                 |
|       2012-08-21|      E|               I|        null|       000005191|              T|   BMI|      190.9|       100|               Q|                 |
|       2012-08-21|      E|               I|        null|       

                                                                                

In [26]:
jdbcDF.select("network", "message_category", "message_sequence")\
    .where((jdbcDF.market_exchange == "T") & (jdbcDF.trade_price == 190.9)) \
    .sort("message_sequence").show()


+-------+----------------+----------------+
|network|message_category|message_sequence|
+-------+----------------+----------------+
|      E|               I|       000005122|
|      E|               I|       000005190|
|      E|               I|       000005191|
|      E|               I|       000038357|
|      E|               I|       000038358|
|      E|               I|       000038359|
+-------+----------------+----------------+



In [27]:
from pyspark.sql.functions import countDistinct

jdbcDF.select("market_exchange", "symbol") \
  .groupBy("market_exchange")\
  .agg(countDistinct("symbol").alias("distinct_symbol")) \
  .show()


+---------------+---------------+
|market_exchange|distinct_symbol|
+---------------+---------------+
|              K|              8|
|              Q|              2|
|              T|              6|
|              B|              8|
|              Y|              8|
|              M|              5|
|              D|              8|
|              J|              8|
|              Z|              8|
|              N|              6|
|              X|              8|
|              A|              1|
|              P|              8|
+---------------+---------------+



In [28]:
print(jdbcDF.where((jdbcDF.market_exchange == "K")).count())


13032


In [29]:
jdbcDF.select("market_exchange", "symbol") \
    .where((jdbcDF.market_exchange == "K")) \
    .groupBy("market_exchange", "symbol").count().show()


+---------------+------+-----+
|market_exchange|symbol|count|
+---------------+------+-----+
|              K|  AVVB| 1874|
|              K|  MOOT|  852|
|              K|  HMAT| 1209|
|              K|  TMSF| 6001|
|              K|   RAD|  144|
|              K|   BMI| 1599|
|              K|   TAC| 1162|
|              K|  GRIN|  191|
+---------------+------+-----+



In [30]:
jdbcDF.select("market_exchange", "trade_price") \
    .where((jdbcDF.market_exchange == "K") & (jdbcDF.symbol == "GRIN")) \
    .groupBy("market_exchange", "trade_price").sum().show()


+---------------+-----------+------------------+
|market_exchange|trade_price|  sum(trade_price)|
+---------------+-----------+------------------+
|              K|      78.95|             78.95|
|              K|      78.69|            236.07|
|              K|      78.61|             78.61|
|              K|      78.92|            236.76|
|              K|      78.41|            313.64|
|              K|      78.71|            157.42|
|              K|      79.01|            158.02|
|              K|      78.66|393.29999999999995|
|              K|      79.25|             158.5|
|              K|      78.36|             78.36|
|              K|      78.62|             78.62|
|              K|      78.79|            157.58|
|              K|      79.02|            158.04|
|              K|      78.45|             470.7|
|              K|      78.57|235.70999999999998|
|              K|      78.39|             78.39|
|              K|      78.48|392.40000000000003|
|              K|   

In [34]:
from pyspark.sql.functions import col,sum,avg,max

jdbcDF.groupBy("market_exchange","symbol") \
    .agg(sum("trade_price").alias("sum_trade_price"), \
         avg("trade_price").alias("avg_trade_price"), \
         sum("trade_size").alias("sum_trade_size"), \
         max("trade_size").alias("max_trade_size") \
     ) \
    .filter(col("sum_trade_price") > 100000)  \
    .sort("sum_trade_price") \
    .show(truncate=False)


+---------------+------+------------------+------------------+--------------+--------------+
|market_exchange|symbol|sum_trade_price   |avg_trade_price   |sum_trade_size|max_trade_size|
+---------------+------+------------------+------------------+--------------+--------------+
|K              |AVVB  |103014.31499999992|54.97028548559227 |322326        |25000         |
|Y              |AVVB  |103048.57500000022|54.98856723585924 |197345        |1200          |
|B              |BMI   |106718.15500000019|191.59453321364487|45741         |1700          |
|D              |HMAT  |121312.86489999999|44.404416142020494|610860        |5000          |
|K              |TAC   |125733.66500000004|108.20453098106715|145573        |3937          |
|B              |TMSF  |148723.12000000113|45.136000000000344|491084        |2800          |
|J              |TMSF  |170228.53000000224|45.12951484623601 |517004        |2247          |
|Y              |BMI   |180093.16100000014|191.58846914893633|68914   

In [37]:
jdbcDF.write.partitionBy("symbol").saveAsTable("trade_hive") 


ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
22/06/04 15:14:30 WARN org.apache.hadoop.hive.ql.session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
