### Ways to access a column in Pyspark

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession. \
builder. \
appName("Ways to access a column in Pyspark"). \
config("spark.sql.warehouse.dir","/user/itv020649/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [3]:
spark

In [4]:
orders_schema = 'order_id long,order_date date,cust_id long,order_status string'

In [5]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [6]:
orders_df.show()

+--------+----------+-------+---------------+
|order_id|order_date|cust_id|   order_status|
+--------+----------+-------+---------------+
|       1|2013-07-25|  11599|         CLOSED|
|       2|2013-07-25|    256|PENDING_PAYMENT|
|       3|2013-07-25|  12111|       COMPLETE|
|       4|2013-07-25|   8827|         CLOSED|
|       5|2013-07-25|  11318|       COMPLETE|
|       6|2013-07-25|   7130|       COMPLETE|
|       7|2013-07-25|   4530|       COMPLETE|
|       8|2013-07-25|   2911|     PROCESSING|
|       9|2013-07-25|   5657|PENDING_PAYMENT|
|      10|2013-07-25|   5648|PENDING_PAYMENT|
|      11|2013-07-25|    918| PAYMENT_REVIEW|
|      12|2013-07-25|   1837|         CLOSED|
|      13|2013-07-25|   9149|PENDING_PAYMENT|
|      14|2013-07-25|   9842|     PROCESSING|
|      15|2013-07-25|   2568|       COMPLETE|
|      16|2013-07-25|   7276|PENDING_PAYMENT|
|      17|2013-07-25|   2667|       COMPLETE|
|      18|2013-07-25|   1205|         CLOSED|
|      19|2013-07-25|   9488|PENDI

In [7]:
orders_df.select("order_id","order_date").show()
#accessing a column in string notation""

+--------+----------+
|order_id|order_date|
+--------+----------+
|       1|2013-07-25|
|       2|2013-07-25|
|       3|2013-07-25|
|       4|2013-07-25|
|       5|2013-07-25|
|       6|2013-07-25|
|       7|2013-07-25|
|       8|2013-07-25|
|       9|2013-07-25|
|      10|2013-07-25|
|      11|2013-07-25|
|      12|2013-07-25|
|      13|2013-07-25|
|      14|2013-07-25|
|      15|2013-07-25|
|      16|2013-07-25|
|      17|2013-07-25|
|      18|2013-07-25|
|      19|2013-07-25|
|      20|2013-07-25|
+--------+----------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import *

In [9]:
orders_df.select("order_id",orders_df.order_date,orders_df['order_date'],column('cust_id'),col('cust_id'),expr("order_status")).show()

+--------+----------+----------+-------+-------+---------------+
|order_id|order_date|order_date|cust_id|cust_id|   order_status|
+--------+----------+----------+-------+-------+---------------+
|       1|2013-07-25|2013-07-25|  11599|  11599|         CLOSED|
|       2|2013-07-25|2013-07-25|    256|    256|PENDING_PAYMENT|
|       3|2013-07-25|2013-07-25|  12111|  12111|       COMPLETE|
|       4|2013-07-25|2013-07-25|   8827|   8827|         CLOSED|
|       5|2013-07-25|2013-07-25|  11318|  11318|       COMPLETE|
|       6|2013-07-25|2013-07-25|   7130|   7130|       COMPLETE|
|       7|2013-07-25|2013-07-25|   4530|   4530|       COMPLETE|
|       8|2013-07-25|2013-07-25|   2911|   2911|     PROCESSING|
|       9|2013-07-25|2013-07-25|   5657|   5657|PENDING_PAYMENT|
|      10|2013-07-25|2013-07-25|   5648|   5648|PENDING_PAYMENT|
|      11|2013-07-25|2013-07-25|    918|    918| PAYMENT_REVIEW|
|      12|2013-07-25|2013-07-25|   1837|   1837|         CLOSED|
|      13|2013-07-25|2013

1. column string - "cust_id"
2. column object - col("cust_id")
3. column expression - expr()

1. "order_id" - column string
2. order_df.order_date - like resolving amgiguity in case of joining tables
3. column('cust_id') - column object - .where(col('order_status').like('PENDING%'))
4. expr("order_status") - column expression -order_df.select("order_id",expr("cust_id + 1 as new_cust_id))

In [10]:
#column expression
orders_df.select("order_id",expr("cust_id + 1 as new_cust_id")).show()

+--------+-----------+
|order_id|new_cust_id|
+--------+-----------+
|       1|      11600|
|       2|        257|
|       3|      12112|
|       4|       8828|
|       5|      11319|
|       6|       7131|
|       7|       4531|
|       8|       2912|
|       9|       5658|
|      10|       5649|
|      11|        919|
|      12|       1838|
|      13|       9150|
|      14|       9843|
|      15|       2569|
|      16|       7277|
|      17|       2668|
|      18|       1206|
|      19|       9489|
|      20|       9199|
+--------+-----------+
only showing top 20 rows



In [11]:
#column object
orders_df.select("order_id",orders_df.order_date,orders_df['order_date'],column('cust_id'),col('cust_id'),expr("order_status")).where(col('order_status').like('PENDING%')).show()

+--------+----------+----------+-------+-------+---------------+
|order_id|order_date|order_date|cust_id|cust_id|   order_status|
+--------+----------+----------+-------+-------+---------------+
|       2|2013-07-25|2013-07-25|    256|    256|PENDING_PAYMENT|
|       9|2013-07-25|2013-07-25|   5657|   5657|PENDING_PAYMENT|
|      10|2013-07-25|2013-07-25|   5648|   5648|PENDING_PAYMENT|
|      13|2013-07-25|2013-07-25|   9149|   9149|PENDING_PAYMENT|
|      16|2013-07-25|2013-07-25|   7276|   7276|PENDING_PAYMENT|
|      19|2013-07-25|2013-07-25|   9488|   9488|PENDING_PAYMENT|
|      21|2013-07-25|2013-07-25|   2711|   2711|        PENDING|
|      23|2013-07-25|2013-07-25|   4367|   4367|PENDING_PAYMENT|
|      27|2013-07-25|2013-07-25|   3241|   3241|PENDING_PAYMENT|
|      30|2013-07-25|2013-07-25|  10039|  10039|PENDING_PAYMENT|
|      33|2013-07-25|2013-07-25|   5793|   5793|PENDING_PAYMENT|
|      36|2013-07-25|2013-07-25|   5649|   5649|        PENDING|
|      39|2013-07-25|2013

## Aggregate functions

1. Simple aggregations - one o/p - count,sum,max,min
1. grouping aggregations - group by - catorigical calculation -
1. windowing aggregations


In [12]:
orders_df1 = spark.read \
.format("csv") \
.option("inferSchema","true") \
.option("header","true") \
.load("/public/trendytech/datasets/order_data.csv")

In [13]:
orders_df1.show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536378|     null|PACK OF 60 DINOSA...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|     null|PACK OF 60 PINK P...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|    84991|60 TEATIME FAIRY ...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|   84519A|TOMATO CHARLIE+LO...|       6|01-12-2010 9.37|     2.95|     14688|United Kingdom|
|   536378|   85183B|CHARLIE & LOLA WA...|      48|01-12-2010 9.37|     1.25|     14688|United Kingdom|
|   536378|   85071B|RED CHARLIE+LOLA ...|      96|01-12-2010 9.37|     0.38|     14688|United Kingdom|
|   536378|    21931|JUMBO STORAGE BAG...|      10|01-12-2010 9.

In [14]:
from pyspark.sql.functions import *

In [15]:
#progamatic style
orders_df1.select(count("*").alias("total count"),countDistinct("InvoiceNo").alias("distinct invoice"),sum("Quantity").alias("sum quantity"),avg("UnitPrice").alias("avg price")).show()

+-----------+----------------+------------+-----------------+
|total count|distinct invoice|sum quantity|        avg price|
+-----------+----------------+------------+-----------------+
|     541782|           25858|     5175855|4.611565323321933|
+-----------+----------------+------------+-----------------+



In [16]:
#sql style in DF
orders_df1.selectExpr("count(*) as row_count","count(distinct InvoiceNo) as unique_invoice","sum(Quantity) as sum_quantity","avg(UnitPrice) as avgerage")

row_count,unique_invoice,sum_quantity,avgerage
541782,25858,5175855,4.611565323321931


In [17]:
orders_df1.createOrReplaceTempView("orders")

In [18]:
#sql style in table
spark.sql("select count(*) as row_count,count(distinct InvoiceNo) as unique_invoice,sum(Quantity) as sum_quantity,avg(UnitPrice) as avgerage from orders")

row_count,unique_invoice,sum_quantity,avgerage
541782,25858,5175855,4.611565323321925


In [19]:
from pyspark.sql.functions import *

when we do count(*) it ignores null, but if we do count(column) is doesnot ignore NULL

### grouping aggreation

In [20]:
#programatic style
summery_df= orders_df1 \
.groupBy("country","invoiceno") \
.agg(sum("quantity").alias("total_quantity"),sum(expr("quantity * unitprice")).alias("invoice_value")).sort("invoiceno")
#calculation inside sum need use of "expr"

In [21]:
summery_df.show()

+--------------+---------+--------------+------------------+
|       country|invoiceno|total_quantity|     invoice_value|
+--------------+---------+--------------+------------------+
|United Kingdom|   536378|           242|192.78000000000003|
|United Kingdom|   536380|            24|              34.8|
|United Kingdom|   536381|           198|449.97999999999996|
|United Kingdom|   536382|           134|430.59999999999997|
|United Kingdom|   536384|           190|             489.6|
|United Kingdom|   536385|            53|            130.85|
|United Kingdom|   536386|           236|508.20000000000005|
|United Kingdom|   536387|          1440|           3193.92|
|United Kingdom|   536388|           108|            226.14|
|     Australia|   536389|           107|            358.25|
|United Kingdom|   536390|          1568|           1825.74|
|United Kingdom|   536392|           103|318.14000000000004|
|United Kingdom|   536393|             8|              79.6|
|United Kingdom|   53639

In [22]:
#sql style
orders_df1 \
.groupBy("country","invoiceno") \
.agg(expr("sum(quantity) as total_quantity"), expr("sum(quantity * unitprice) as invoice_value")).sort("invoiceno")

country,invoiceno,total_quantity,invoice_value
United Kingdom,536378,242,192.78000000000003
United Kingdom,536380,24,34.8
United Kingdom,536381,198,449.98
United Kingdom,536382,134,430.6
United Kingdom,536384,190,489.6
United Kingdom,536385,53,130.85
United Kingdom,536386,236,508.2000000000001
United Kingdom,536387,1440,3193.92
United Kingdom,536388,108,226.14
Australia,536389,107,358.25


In [23]:
#spark sql
orders_df1.createOrReplaceTempView("orders2")

In [24]:
spark.sql("""select country, invoiceno
,sum(quantity) as total_quantity
,sum(quantity * unitprice) as invoice_value from orders2 
group by country, invoiceno""").show()
#use """ """ to run in multiple line in spark sql

+--------------+---------+--------------+------------------+
|       country|invoiceno|total_quantity|     invoice_value|
+--------------+---------+--------------+------------------+
|United Kingdom|   536446|           329|            440.89|
|United Kingdom|   536508|           216|            155.52|
|United Kingdom|   537811|            74|            268.86|
|United Kingdom|   538895|           370|            247.38|
|United Kingdom|   540453|           341|302.44999999999993|
|United Kingdom|   541291|           217|305.81000000000006|
|United Kingdom|   542551|            -1|               0.0|
|United Kingdom|   542576|            -1|               0.0|
|United Kingdom|   542628|             9|            132.35|
|United Kingdom|   542886|           199| 320.5099999999998|
|United Kingdom|   542907|            75|            313.85|
|United Kingdom|   543131|           134|             164.1|
|United Kingdom|   543189|           102|            153.94|
|United Kingdom|   54326

### Windowing aggrgerate

In [25]:
orders_df3 = spark.read \
.format("csv") \
.option("inferSchema","true") \
.option("header","true") \
.load("/public/trendytech/datasets/windowdata.csv")

In [26]:
orders_df3.sort("country").show()

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     49|          1|          214|       258.9|
|      Australia|     48|          1|          107|      358.25|
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     48|          1|          528|       346.1|
|        Belgium|     50|          2|          285|      625.16|
|        Belgium|     51|          2|          942|      838.65|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     49|

|        Germany|     48|         11|         1795|     3309.75|

|        Germany|     49|         12|         1852|     4521.39|

|        Germany|     50|         15|         1973|     5065.79|

|        Germany|     51|          5|         1103|     1665.91|

1. Partition by country : Window.partitionBy("country") \
2. sort based on week : .orderBy("weeknum") \
3. window size: .rowsBetween(Window.unboundedPreceding,Window.currentRow) 


In [27]:
from pyspark.sql import *
#running tatal

In [28]:
my_window = Window.partitionBy("country") \
.orderBy("weeknum") \
.rowsBetween(Window.unboundedPreceding,Window.currentRow) 
#unboundedPreceding first row of the group
#.rowsBetween(-2,Window.currentRow) - this means current row and two previous row


In [29]:
results_df = orders_df3.withColumn("running_column",sum("invoicevalue").over(my_window))

In [30]:
results_df.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|    running_column|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|     3309.75|           3309.75|
|Germany|     49|         12|         1852|     4521.39|           7831.14|
|Germany|     50|         15|         1973|     5065.79|          12896.93|
|Germany|     51|          5|         1103|     1665.91|          14562.84|
| France|     48|          4|         1299|     2808.16|           2808.16|
| France|     49|          9|         2303|     4527.01|           7335.17|
| France|     50|          6|          529|      537.32|           7872.49|
| France|     51|          5|          847|     1702.87|           9575.36|
|Belgium|     48|          1|          528|       346.1|             346.1|
|Belgium|   

#### More in windowing funtion

1. rank
1. dense rank
1. rwo number
1. lead
1. lag

In [31]:
spark

In [32]:
orders_df4 = spark.read \
.format("csv") \
.option("inferSchema","true") \
.option("header","true") \
.load("/public/trendytech/datasets/windowdatamodified.csv")

In [33]:
orders_df4.orderBy("country","invoicevalue").show(50)

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     49|          1|          214|       258.9|
|      Australia|     48|          1|          107|      358.25|
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     50|          2|          285|      625.16|
|        Belgium|     51|          2|          942|       800.0|
|        Belgium|     48|          1|          528|       800.0|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     48|

In [34]:
#running total defing window= first row to current row
my_window1 = Window.partitionBy("country") \
.orderBy("weeknum") \
.rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [35]:
results4 = orders_df4.withColumn("running_total",sum("invoicevalue").over(my_window1))

In [36]:
results4.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|      1600.0|            1600.0|
|Germany|     49|         12|         1852|      1800.0|            3400.0|
|Germany|     50|         15|         1973|      1800.0|            5200.0|
|Germany|     51|          5|         1103|      1600.0|            6800.0|
| France|     48|          4|         1299|       500.0|             500.0|
| France|     49|          9|         2303|       500.0|            1000.0|
| France|     50|          6|          529|      537.32|1537.3200000000002|
| France|     51|          5|          847|       500.0|2037.3200000000002|
|Belgium|     48|          1|          528|       800.0|             800.0|
|Belgium|   

In [37]:
my_window5 = Window.partitionBy("country") \
.orderBy(desc("invoicevalue"))

In [38]:
rank_result = orders_df4.withColumn("rank",rank().over(my_window5))
#rank we skip when tie

In [39]:
rank_result.show()

+-------+-------+-----------+-------------+------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|rank|
+-------+-------+-----------+-------------+------------+----+
| Sweden|     50|          3|         3714|      2646.3|   1|
|Germany|     49|         12|         1852|      1800.0|   1|
|Germany|     50|         15|         1973|      1800.0|   1|
|Germany|     48|         11|         1795|      1600.0|   3|
|Germany|     51|          5|         1103|      1600.0|   3|
| France|     50|          6|          529|      537.32|   1|
| France|     51|          5|          847|       500.0|   2|
| France|     49|          9|         2303|       500.0|   2|
| France|     48|          4|         1299|       500.0|   2|
|Belgium|     48|          1|          528|       800.0|   1|
|Belgium|     51|          2|          942|       800.0|   1|
|Belgium|     50|          2|          285|      625.16|   3|
|Finland|     50|          1|         1254|       892.8|   1|
|  India

In [40]:
rownumber_result = orders_df4.withColumn("rank",row_number().over(my_window5))
#row number for top nth

In [41]:
rownumber_result.select("*").where("rank==1").drop("rank").sort("country").show

<bound method DataFrame.show of +---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     48|          1|          528|       800.0|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     50|          6|          529|      537.32|
|        Germany|     49|         12|         1852|      1800.0|
|        Iceland|     49|          1|          319|      711.79|
|          India|     49|          5|         1280|      3

### understanding lag lead

In [42]:
my_window6 = Window.partitionBy("country") \
.orderBy("weeknum")

In [43]:
result_lag = orders_df4.withColumn("previous_week",lag("invoicevalue").over(my_window6))

In [44]:
result_lag.withColumn("invoice_diff",expr("invoicevalue - previous_week"))

country,weeknum,numinvoices,totalquantity,invoicevalue,previous_week,invoice_diff
Sweden,50,3,3714,2646.3,,
Germany,48,11,1795,1600.0,,
Germany,49,12,1852,1800.0,1600.0,200.0
Germany,50,15,1973,1800.0,1800.0,0.0
Germany,51,5,1103,1600.0,1800.0,-200.0
France,48,4,1299,500.0,,
France,49,9,2303,500.0,500.0,0.0
France,50,6,529,537.32,500.0,37.32000000000005
France,51,5,847,500.0,537.32,-37.32000000000005
Belgium,48,1,528,800.0,,


In [45]:
result_lag.show()

+-------+-------+-----------+-------------+------------+-------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|previous_week|
+-------+-------+-----------+-------------+------------+-------------+
| Sweden|     50|          3|         3714|      2646.3|         null|
|Germany|     48|         11|         1795|      1600.0|         null|
|Germany|     49|         12|         1852|      1800.0|       1600.0|
|Germany|     50|         15|         1973|      1800.0|       1800.0|
|Germany|     51|          5|         1103|      1600.0|       1800.0|
| France|     48|          4|         1299|       500.0|         null|
| France|     49|          9|         2303|       500.0|        500.0|
| France|     50|          6|          529|      537.32|        500.0|
| France|     51|          5|          847|       500.0|       537.32|
|Belgium|     48|          1|          528|       800.0|         null|
|Belgium|     50|          2|          285|      625.16|        800.0|
|Belgi

In [46]:
result_lead = orders_df4.withColumn("next_week",lag("invoicevalue").over(my_window6))

In [47]:
result_lead.show()

+-------+-------+-----------+-------------+------------+---------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|next_week|
+-------+-------+-----------+-------------+------------+---------+
| Sweden|     50|          3|         3714|      2646.3|     null|
|Germany|     48|         11|         1795|      1600.0|     null|
|Germany|     49|         12|         1852|      1800.0|   1600.0|
|Germany|     50|         15|         1973|      1800.0|   1800.0|
|Germany|     51|          5|         1103|      1600.0|   1800.0|
| France|     48|          4|         1299|       500.0|     null|
| France|     49|          9|         2303|       500.0|    500.0|
| France|     50|          6|          529|      537.32|    500.0|
| France|     51|          5|          847|       500.0|   537.32|
|Belgium|     48|          1|          528|       800.0|     null|
|Belgium|     50|          2|          285|      625.16|    800.0|
|Belgium|     51|          2|          942|       800.0|   625

#### Analyzing log files

log data files stored in HDFS

/public/trendytech/datasets/logdata1m.csv

we need to analyse these logs to find some inference

January error 10000

December 20000

12 months

Five different levels

60 output rows

In [48]:
! hdfs dfs -head /public/trendytech/datasets/logdata1m.csv

INFO,2015-8-8 20:49:22
WARN,2015-1-14 20:05:00
INFO,2017-6-14 00:08:35
INFO,2016-1-18 11:50:14
DEBUG,2017-7-1 12:55:02
INFO,2014-2-26 12:34:21
INFO,2015-7-12 11:13:47
INFO,2017-4-15 01:20:18
DEBUG,2016-11-2 20:19:23
INFO,2012-8-20 10:09:44
DEBUG,2014-4-22 21:30:49
WARN,2013-12-6 17:54:15
DEBUG,2017-1-12 10:47:02
DEBUG,2016-6-25 11:06:42
ERROR,2015-6-28 19:25:05
DEBUG,2012-6-24 01:06:37
INFO,2014-12-9 09:53:54
DEBUG,2015-11-8 19:20:08
INFO,2017-7-21 18:34:18
DEBUG,2014-12-26 06:38:42
DEBUG,2013-1-6 16:56:43
INFO,2015-10-8 11:33:25
INFO,2016-11-18 09:47:31
DEBUG,2015-2-6 16:24:07
WARN,2016-7-26 18:54:43
INFO,2012-10-18 14:35:19
DEBUG,2012-4-26 14:26:50
DEBUG,2013-9-28 20:27:13
INFO,2017-8-20 13:17:27
INFO,2015-4-13 09:28:17
DEBUG,2015-7-17 00:49:27
DEBUG,2014-7-26 02:33:09
INFO,2016-1-13 09:51:57
DEBUG,2015-1-14 08:55:30
DEBUG,2016-1-20 03:47:06
DEBUG,2013-7-8 21:00:50
DEBUG,2012-5-22 11:43:57
DEBUG,2013-3-20 06:14:50
DEBUG,2017-7-13 15:35:11
DEBUG,2013-1-21 20:20:25
DEBU

In [49]:
logs_data = [
    ('INFO', '2015-8-8 20:49:22'),
    ('WARN', '2015-1-14 20:05:00'),
    ('INFO', '2017-6-14 00:08:35'),
    ('INFO', '2016-1-18 11:50:14'),
    ('DEBUG', '2017-7-1 12:55:02'),
    ('INFO', '2014-2-26 12:34:21'),
    ('INFO', '2015-7-12 11:13:47'),
    ('INFO', '2017-4-15 01:20:18'),
    ('DEBUG', '2016-11-2 20:19:23'),
    ('INFO', '2012-8-20 10:09:44'),
    ('DEBUG', '2014-4-22 21:30:49'),
    ('WARN', '2013-12-6 17:54:15')
]


In [50]:
log_df = spark.createDataFrame(logs_data).toDF('loglevel','logtime')

In [51]:
log_df.show()

+--------+------------------+
|loglevel|           logtime|
+--------+------------------+
|    INFO| 2015-8-8 20:49:22|
|    WARN|2015-1-14 20:05:00|
|    INFO|2017-6-14 00:08:35|
|    INFO|2016-1-18 11:50:14|
|   DEBUG| 2017-7-1 12:55:02|
|    INFO|2014-2-26 12:34:21|
|    INFO|2015-7-12 11:13:47|
|    INFO|2017-4-15 01:20:18|
|   DEBUG|2016-11-2 20:19:23|
|    INFO|2012-8-20 10:09:44|
|   DEBUG|2014-4-22 21:30:49|
|    WARN|2013-12-6 17:54:15|
+--------+------------------+



In [52]:
log_df.printSchema()

root
 |-- loglevel: string (nullable = true)
 |-- logtime: string (nullable = true)



In [53]:
from pyspark.sql.functions import *

In [54]:
new_log_df = log_df.withColumn("logtime",to_timestamp("logtime"))

In [55]:
new_log_df.printSchema()

root
 |-- loglevel: string (nullable = true)
 |-- logtime: timestamp (nullable = true)



In [56]:
new_log_df.createOrReplaceTempView("serverLogs")

In [57]:
spark.sql("select * from serverLogs").show()

+--------+-------------------+
|loglevel|            logtime|
+--------+-------------------+
|    INFO|2015-08-08 20:49:22|
|    WARN|2015-01-14 20:05:00|
|    INFO|2017-06-14 00:08:35|
|    INFO|2016-01-18 11:50:14|
|   DEBUG|2017-07-01 12:55:02|
|    INFO|2014-02-26 12:34:21|
|    INFO|2015-07-12 11:13:47|
|    INFO|2017-04-15 01:20:18|
|   DEBUG|2016-11-02 20:19:23|
|    INFO|2012-08-20 10:09:44|
|   DEBUG|2014-04-22 21:30:49|
|    WARN|2013-12-06 17:54:15|
+--------+-------------------+



In [58]:
spark.sql("select loglevel, date_format(logtime, 'MMMM') AS MONTH FROM serverlogs").show()

+--------+--------+
|loglevel|   MONTH|
+--------+--------+
|    INFO|  August|
|    WARN| January|
|    INFO|    June|
|    INFO| January|
|   DEBUG|    July|
|    INFO|February|
|    INFO|    July|
|    INFO|   April|
|   DEBUG|November|
|    INFO|  August|
|   DEBUG|   April|
|    WARN|December|
+--------+--------+



appling aggregation

In [59]:
spark.sql("select loglevel, date_format(logtime, 'MMMM') AS MONTH,count(*) as total_occurence FROM serverlogs group by loglevel,month").show()

+--------+--------+---------------+
|loglevel|   MONTH|total_occurence|
+--------+--------+---------------+
|    INFO|    June|              1|
|    WARN|December|              1|
|   DEBUG|    July|              1|
|    INFO|February|              1|
|    WARN| January|              1|
|    INFO|  August|              2|
|   DEBUG|November|              1|
|    INFO|   April|              1|
|    INFO| January|              1|
|   DEBUG|   April|              1|
|    INFO|    July|              1|
+--------+--------+---------------+



In [60]:
logschema = "loglevel string, logtime timestamp"

In [61]:
log_df = spark.read \
.format("csv") \
.schema(logschema) \
.load("/public/trendytech/datasets/logdata1m.csv")

In [62]:
log_df.show()

+--------+-------------------+
|loglevel|            logtime|
+--------+-------------------+
|    INFO|2015-08-08 20:49:22|
|    WARN|2015-01-14 20:05:00|
|    INFO|2017-06-14 00:08:35|
|    INFO|2016-01-18 11:50:14|
|   DEBUG|2017-07-01 12:55:02|
|    INFO|2014-02-26 12:34:21|
|    INFO|2015-07-12 11:13:47|
|    INFO|2017-04-15 01:20:18|
|   DEBUG|2016-11-02 20:19:23|
|    INFO|2012-08-20 10:09:44|
|   DEBUG|2014-04-22 21:30:49|
|    WARN|2013-12-06 17:54:15|
|   DEBUG|2017-01-12 10:47:02|
|   DEBUG|2016-06-25 11:06:42|
|   ERROR|2015-06-28 19:25:05|
|   DEBUG|2012-06-24 01:06:37|
|    INFO|2014-12-09 09:53:54|
|   DEBUG|2015-11-08 19:20:08|
|    INFO|2017-07-21 18:34:18|
|   DEBUG|2014-12-26 06:38:42|
+--------+-------------------+
only showing top 20 rows



In [63]:
log_df.count()

1000000

In [68]:
log_df.createOrReplaceTempView("serverlog")

In [69]:
spark.sql("""select loglevel, 
date_format(logtime, 'MMMM') AS MONTH, 
int(date_format(logtime,'M')) as month_num, 
count(*) as total_occurence 
FROM serverlog group by loglevel,month,month_num order by month_num""").show(20)
#we need to cast month num to intiger bydefault its string and sorting is done as per ASCII

+--------+--------+---------+---------------+
|loglevel|   MONTH|month_num|total_occurence|
+--------+--------+---------+---------------+
|    WARN| January|        1|           8217|
|   DEBUG| January|        1|          41961|
|   FATAL| January|        1|             94|
|    INFO| January|        1|          29119|
|   ERROR| January|        1|           4054|
|    INFO|February|        2|          28983|
|   DEBUG|February|        2|          41734|
|    WARN|February|        2|           8266|
|   FATAL|February|        2|             72|
|   ERROR|February|        2|           4013|
|    INFO|   March|        3|          29095|
|   FATAL|   March|        3|             70|
|   ERROR|   March|        3|           4122|
|   DEBUG|   March|        3|          41652|
|    WARN|   March|        3|           8165|
|   DEBUG|   April|        4|          41869|
|   ERROR|   April|        4|           4107|
|    INFO|   April|        4|          29302|
|    WARN|   April|        4|     

In [71]:
#use month_num as aggregate
result_df = spark.sql("""select loglevel, 
date_format(logtime, 'MMMM') AS MONTH, 
first(date_format(logtime,'M')) as month_num, 
count(*) as total_occurence 
FROM serverlog group by loglevel,month order by month_num""")


In [72]:
final_df = result_df.drop("month_num")

In [73]:
final_df.show()

+--------+--------+---------------+
|loglevel|   MONTH|total_occurence|
+--------+--------+---------------+
|    INFO| January|          29119|
|   DEBUG| January|          41961|
|   FATAL| January|             94|
|   ERROR| January|           4054|
|    WARN| January|           8217|
|   DEBUG| October|          41936|
|   FATAL| October|             92|
|    INFO| October|          29018|
|    WARN| October|           8226|
|   ERROR| October|           4040|
|   DEBUG|November|          33366|
|   ERROR|November|           3389|
|    INFO|November|          23301|
|   FATAL|November|          16797|
|    WARN|November|           6616|
|    WARN|December|           8328|
|   ERROR|December|           4106|
|   FATAL|December|             94|
|    INFO|December|          28874|
|   DEBUG|December|          41749|
+--------+--------+---------------+
only showing top 20 rows



#### Pivot table

In [76]:
spark.sql("""select loglevel, date_format(logtime, 'MM') AS MONTH
FROM serverlog""").groupBy('loglevel').pivot('month').count().show()
#groupby as row, pivot as column

+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|loglevel|   01|   02|   03|   04|   05|   06|   07|   08|   09|   10|   11|   12|
+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|    INFO|29119|28983|29095|29302|28900|29143|29300|28993|29038|29018|23301|28874|
|   ERROR| 4054| 4013| 4122| 4107| 4086| 4059| 3976| 3987| 4161| 4040| 3389| 4106|
|    WARN| 8217| 8266| 8165| 8277| 8403| 8191| 8222| 8381| 8352| 8226| 6616| 8328|
|   DEBUG|41961|41734|41652|41869|41785|41774|42085|42147|41433|41936|33366|41749|
|   FATAL|   94|   72|   70|   83|   60|   78|   98|   80|   81|   92|16797|   94|
+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+



In [78]:
months_list = [
    "January",
    "February",
    "March",
    "April",
    "May",
    "June",
    "July",
    "August",
    "September",
    "October",
    "November",
    "December"
]


In [81]:
spark.sql("""select loglevel, date_format(logtime, 'MMMM') AS MONTH
FROM serverlog""").groupBy('loglevel').pivot('month',months_list).count().show()
#optimization: define a piviot list

+--------+-------+--------+-----+-----+-----+-----+-----+------+---------+-------+--------+--------+
|loglevel|January|February|March|April|  May| June| July|August|September|October|November|December|
+--------+-------+--------+-----+-----+-----+-----+-----+------+---------+-------+--------+--------+
|    INFO|  29119|   28983|29095|29302|28900|29143|29300| 28993|    29038|  29018|   23301|   28874|
|   ERROR|   4054|    4013| 4122| 4107| 4086| 4059| 3976|  3987|     4161|   4040|    3389|    4106|
|    WARN|   8217|    8266| 8165| 8277| 8403| 8191| 8222|  8381|     8352|   8226|    6616|    8328|
|   FATAL|     94|      72|   70|   83|   60|   78|   98|    80|       81|     92|   16797|      94|
|   DEBUG|  41961|   41734|41652|41869|41785|41774|42085| 42147|    41433|  41936|   33366|   41749|
+--------+-------+--------+-----+-----+-----+-----+-----+------+---------+-------+--------+--------+

