In [1]:
import os
os.listdir(os.getcwd())

['.ipynb_checkpoints',
 'derby.log',
 'timeseries.ipynb',
 'spark-warehouse',
 'metastore_db']

In [2]:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as f


In [3]:
sparkSession = (SparkSession
                .builder
                .appName('example')
                .master('local')
                .enableHiveSupport()
                .getOrCreate())

In [4]:
#Check if hive databases are available
sparkSession.sql("""
     show databases
""").toPandas()

Unnamed: 0,databaseName
0,default
1,logs


In [5]:
sparkSession.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/home/jovyan/hivetimeseries/spark-warehouse'),
 Database(name='logs', description='', locationUri='file:/home/jovyan/hivetimeseries/spark-warehouse/logs.db')]

In [6]:
#Check tables in a database
sparkSession.sql("""
     show tables in default
""").toPandas()

Unnamed: 0,database,tableName,isTemporary


In [7]:
sparkSession.catalog.listTables("default")


[]

In [8]:
sparkSession.sql('create database IF NOT EXISTS logs')

DataFrame[]

In [9]:
sparkSession.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/home/jovyan/hivetimeseries/spark-warehouse'),
 Database(name='logs', description='', locationUri='file:/home/jovyan/hivetimeseries/spark-warehouse/logs.db')]

In [10]:
# Define schema
schema = StructType([
    StructField("ip", StringType(), True),
    StructField("clientID", StringType(), True),
    StructField("userID", StringType(), True),
    StructField("time", StringType(), True),
    StructField("method", StringType(), True),
    StructField("endpoint", StringType(), True),
    StructField("protocol", StringType(), True),
    StructField("response", IntegerType(), True),
    StructField("size", IntegerType(), True)
])

In [11]:
geoip_from_csv_df = sparkSession\
 .read.csv("../data/access_logs.csv", schema)
type(geoip_from_csv_df)    

pyspark.sql.dataframe.DataFrame

In [12]:
sparkSession.sql('use logs')
geoip_from_csv_df.write.mode('overwrite').saveAsTable("weblogs")

In [13]:
sparkSession.catalog.listTables("logs")

[Table(name='weblogs', database='logs', description=None, tableType='MANAGED', isTemporary=False)]

In [14]:
#query against the hive table
sparkSession.sql("""
select *
from weblogs
where response<>'200'
""").limit(30).toPandas()

Unnamed: 0,ip,clientID,userID,time,method,endpoint,protocol,response,size
0,64.242.88.10,-,-,07/Mar/2004:16:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846
1,64.240.88.10,-,-,07/Mar/2004:15:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846
2,68.240.88.10,-,-,07/Mar/2004:16:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846


In [15]:
#query against the dataframe
geoip_from_csv_df.select(geoip_from_csv_df.ip,
 geoip_from_csv_df.endpoint)\
 .limit(30).toPandas()

Unnamed: 0,ip,endpoint
0,64.242.88.10,/twiki/bin/edit/Main/Double_bounce_sender?topi...
1,64.242.88.10,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....
2,64.242.88.10,/mailman/listinfo/hsdivision
3,64.242.88.10,/twiki/bin/view/TWiki/WikiSyntax
4,64.240.88.10,/twiki/bin/edit/Main/Double_bounce_sender?topi...
5,64.240.88.10,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....
6,64.240.88.10,/mailman/listinfo/hsdivision
7,64.240.88.10,/twiki/bin/view/TWiki/WikiSyntax
8,68.240.88.10,/twiki/bin/edit/Main/Double_bounce_sender?topi...
9,68.240.88.10,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....


In [16]:
weblogs_df = sparkSession.read.table("weblogs")
weblogs_df.limit(5).toPandas()

Unnamed: 0,ip,clientID,userID,time,method,endpoint,protocol,response,size
0,64.242.88.10,-,-,07/Mar/2004:16:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846
1,64.242.88.10,-,-,07/Mar/2004:16:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523
2,64.242.88.10,-,-,07/Mar/2004:16:10:02 -0800,GET,/mailman/listinfo/hsdivision,HTTP/1.1,200,6291
3,64.242.88.10,-,-,07/Mar/2004:16:11:58 -0800,GET,/twiki/bin/view/TWiki/WikiSyntax,HTTP/1.1,200,7352
4,64.240.88.10,-,-,07/Mar/2004:15:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846


In [17]:
type(weblogs_df)

pyspark.sql.dataframe.DataFrame

In [18]:
sparkSession.sql("""
   select unix_timestamp() as unix_timestamp
""").toPandas()


Unnamed: 0,unix_timestamp
0,1524452023


In [19]:
weblogs_unixtime_df = weblogs_df.withColumn("unixtime", \
                       f.unix_timestamp("time","dd/MMM/yyyy:HH:mm:ss Z"))   
                                       
                          
weblogs_unixtime_df.limit(30).toPandas()    

Unnamed: 0,ip,clientID,userID,time,method,endpoint,protocol,response,size,unixtime
0,64.242.88.10,-,-,07/Mar/2004:16:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,1078704349
1,64.242.88.10,-,-,07/Mar/2004:16:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,1078704411
2,64.242.88.10,-,-,07/Mar/2004:16:10:02 -0800,GET,/mailman/listinfo/hsdivision,HTTP/1.1,200,6291,1078704602
3,64.242.88.10,-,-,07/Mar/2004:16:11:58 -0800,GET,/twiki/bin/view/TWiki/WikiSyntax,HTTP/1.1,200,7352,1078704718
4,64.240.88.10,-,-,07/Mar/2004:15:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,1078700749
5,64.240.88.10,-,-,07/Mar/2004:15:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,1078700811
6,64.240.88.10,-,-,07/Mar/2004:15:20:42 -0800,GET,/mailman/listinfo/hsdivision,HTTP/1.1,200,6291,1078701642
7,64.240.88.10,-,-,07/Mar/2004:15:31:58 -0800,GET,/twiki/bin/view/TWiki/WikiSyntax,HTTP/1.1,200,7352,1078702318
8,68.240.88.10,-,-,07/Mar/2004:16:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,1078704349
9,68.240.88.10,-,-,07/Mar/2004:17:01:56 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,1078707716


In [20]:
weblogs_unixtime_df.groupby("ip")\
         .agg(f.min("unixtime").alias("begin"),
              f.max("unixtime").alias("end"))\
         .limit(30).toPandas()

Unnamed: 0,ip,begin,end
0,68.240.88.10,1078704349,1078715578
1,64.240.88.10,1078700749,1078702318
2,64.242.88.10,1078704349,1078704718


In [21]:
weblogs_unixtime_df.groupby("ip")\
         .agg(f.min("unixtime").alias("begin"),
              f.max("unixtime").alias("end"))\
         .select("ip", (f.col("end") - f.col("begin")))\
         .limit(30).toPandas()

Unnamed: 0,ip,(end - begin)
0,68.240.88.10,11229
1,64.240.88.10,1569
2,64.242.88.10,369
