In [1]:
import os
os.listdir(os.getcwd())

['.ipynb_checkpoints',
 'derby.log',
 'useragent.ipynb',
 'spark-warehouse',
 'metastore_db']

In [2]:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as f
import pyspark.sql.types as t
import re


In [3]:
sparkSession = (SparkSession
                .builder
                .appName('example')
                .master('local')
                .enableHiveSupport()
                .getOrCreate())

In [4]:
#Check if hive databases are available
sparkSession.sql("""
     show databases
""").toPandas()

Unnamed: 0,databaseName
0,default
1,logs


In [5]:
sparkSession.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/home/jovyan/useragent/spark-warehouse'),
 Database(name='logs', description='', locationUri='file:/home/jovyan/useragent/spark-warehouse/logs.db')]

In [6]:
#Check tables in a database
sparkSession.sql("""
     show tables in default
""").toPandas()

Unnamed: 0,database,tableName,isTemporary


In [7]:
sparkSession.catalog.listTables("default")


[]

In [8]:
sparkSession.sql('create database IF NOT EXISTS logs')

DataFrame[]

In [9]:
sparkSession.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/home/jovyan/useragent/spark-warehouse'),
 Database(name='logs', description='', locationUri='file:/home/jovyan/useragent/spark-warehouse/logs.db')]

In [10]:
# Define schema
schema = StructType([
    StructField("ip", StringType(), True),
    StructField("clientID", StringType(), True),
    StructField("userID", StringType(), True),
    StructField("time", StringType(), True),
    StructField("method", StringType(), True),
    StructField("endpoint", StringType(), True),
    StructField("protocol", StringType(), True),
    StructField("response", IntegerType(), True),
    StructField("size", IntegerType(), True),
    StructField("user_agent", StringType(), True)
])

In [11]:
useragent_from_csv_df = sparkSession\
 .read.csv("../data/user_agent.csv", schema)
type(useragent_from_csv_df)    

pyspark.sql.dataframe.DataFrame

In [12]:
sparkSession.sql('use logs')
useragent_from_csv_df.write.mode('overwrite').saveAsTable("useragent")

In [13]:
sparkSession.catalog.listTables("logs")

[Table(name='useragent', database='logs', description=None, tableType='MANAGED', isTemporary=False)]

In [14]:
#query against the hive table
sparkSession.sql("""
select *
from useragent

""").limit(30).toPandas()

Unnamed: 0,ip,clientID,userID,time,method,endpoint,protocol,response,size,user_agent
0,64.242.88.10,-,-,07/Mar/2004:16:05:49-0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKi...
1,64.242.88.10,-,-,07/Mar/2004:16:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
2,64.242.88.10,-,-,07/Mar/2004:16:10:02 -0800,GET,/mailman/listinfo/hsdivision,HTTP/1.1,200,6291,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
3,64.242.88.10,-,-,07/Mar/2004:16:11:58 -0800,GET,/twiki/bin/view/TWiki/WikiSyntax,HTTP/1.1,200,7352,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
4,64.240.88.10,-,-,07/Mar/2004:15:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,
5,64.240.88.10,-,-,07/Mar/2004:15:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
6,64.240.88.10,-,-,07/Mar/2004:15:10:02 -0800,GET,/mailman/listinfo/hsdivision,HTTP/1.1,200,6291,Mozilla/5.0 (Windows22; U; Windows NT 5.1; en-...
7,64.240.88.10,-,-,07/Mar/2004:15:11:58 -0800,GET,/twiki/bin/view/TWiki/WikiSyntax,HTTP/1.1,200,7352,Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKi...
8,68.240.88.10,-,-,07/Mar/2004:14:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,
9,68.240.88.10,-,-,07/Mar/2004:14:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...


In [15]:
useragent_from_csv_df.limit(10).toPandas()

Unnamed: 0,ip,clientID,userID,time,method,endpoint,protocol,response,size,user_agent
0,64.242.88.10,-,-,07/Mar/2004:16:05:49-0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKi...
1,64.242.88.10,-,-,07/Mar/2004:16:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
2,64.242.88.10,-,-,07/Mar/2004:16:10:02 -0800,GET,/mailman/listinfo/hsdivision,HTTP/1.1,200,6291,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
3,64.242.88.10,-,-,07/Mar/2004:16:11:58 -0800,GET,/twiki/bin/view/TWiki/WikiSyntax,HTTP/1.1,200,7352,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
4,64.240.88.10,-,-,07/Mar/2004:15:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,
5,64.240.88.10,-,-,07/Mar/2004:15:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...
6,64.240.88.10,-,-,07/Mar/2004:15:10:02 -0800,GET,/mailman/listinfo/hsdivision,HTTP/1.1,200,6291,Mozilla/5.0 (Windows22; U; Windows NT 5.1; en-...
7,64.240.88.10,-,-,07/Mar/2004:15:11:58 -0800,GET,/twiki/bin/view/TWiki/WikiSyntax,HTTP/1.1,200,7352,Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKi...
8,68.240.88.10,-,-,07/Mar/2004:14:05:49 -0800,GET,/twiki/bin/edit/Main/Double_bounce_sender?topi...,HTTP/1.1,401,12846,
9,68.240.88.10,-,-,07/Mar/2004:14:06:51 -0800,GET,/twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1....,HTTP/1.1,200,4523,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US...


In [16]:
#define parse_user_agent

def parse_user_agent(user_agent):
    
    if user_agent is not None: 
        user_agent = re.sub("/?[\d_.]+", "", user_agent)
        user_agent = re.sub("[;\(\):,]", "", user_agent)
        user_agent = user_agent.split()
    
    return user_agent

parse_user_agent_udf = f.udf(parse_user_agent, t.ArrayType(t.StringType()))

In [17]:
useragent_from_csv_df.select(parse_user_agent_udf("user_agent").alias("words"))\
                     .select(f.explode("words").alias("word"))\
                     .groupBy("word")\
                     .agg(f.count("*").alias("count"))\
                     .orderBy(f.col("count"))\
                     .toPandas()
    
                            
    

Unnamed: 0,word,count
0,compatible,1
1,+http//wwwbingcom/bingbothtm,1
2,ABCD,1
3,bingbot,1
4,WOW,2
5,U,7
6,en-US,7
7,KHTML,8
8,NT,9
9,AppleWebKit,9
