# <center> Manage Columnar Drifting on Spark Hive </center>
author: [Vinicius R. Barros](http://github.com/muderno)

In [39]:
pip install pyspark findspark pandas

Note: you may need to restart the kernel to use updated packages.


In [40]:
jul95 = "NASA_access_log_Jul95"

In [41]:
# import gzip
# import shutil
# import urllib.request

# url_base = "ftp://ita.ee.lbl.gov/traces/"
# files = jul95

# for file in files:
#     urllib.request.urlretrieve(url_base+file+".gz", file+".gz")
#     with gzip.open(file+".gz", 'rb') as f_in:
#         with open(file, 'wb') as f_out:
#             shutil.copyfileobj(f_in, f_out)

In [42]:
import findspark                         # Import FindSpark
findspark.init()                         # Initiate FindSpark 
                                         # If your $SPARK_HOME is not correct you can use: 
                                         # findspark.init('/path/to/spark/home')

from pyspark import SparkContext         # Needed to create a SparkContext
from pyspark.sql import SparkSession     # Needed to create a SparkSession
from pyspark.sql.types import *          # Needed to define DataFrame Schema.
from pyspark.sql.functions import *      # Needed to SQL functions as Max(), Avg(), etc...
import pandas as pd                      # Needed to let it pretty
import re                                # Needed for regex capability
from datetime import datetime            # To cast dates and timestamps

 Let's create a Spark Context and a Spark Session.

In [43]:
sc = SparkContext.getOrCreate()
    spark = SparkSession.builder.getOrCreate()

We can define a regex pattern to break the log into columns:

In [44]:
pattern = r"^(.*)\s(.*)\s(.*)\s(\[.*\])\s(\".*\")\s(.*)\s(.*)$"
regex = re.compile(pattern)

Then we can create a Schema that will be needed to transform the RDD into Dataframe. We will passa all the fields and add a logId to be an ID coloumn and pass the full line as a field for comparison. Spark SQL Types will help us to acomplish this.

In [45]:
# Define schema to create DataFrame with an array typed column.
logSchema = StructType([StructField("logId", IntegerType(), True),
                       StructField("hostname", StringType(), True),
                       StructField("logname", StringType(), True),
                       StructField("username", StringType(), True),
                       StructField("reqTime", TimestampType(), True),
                       StructField("reqTimeStr",StringType(), True),
                       StructField("firstLine", StringType(), True),
                       StructField("finalStatus", IntegerType(), True),
                       StructField("reqSize", IntegerType(), True),
                       StructField("line", StringType(), True)])

As our logic wont fit in a anonimous function, we need to difine a function to process the data. We declare `logSK` as global variable, it's a SurrogateKey to help us make the ID. In this function we try to break the string into fields and convert them to their proper SQL Types. Returning diferent objects sizes is intentional, that is easier to filter after.

In [46]:
logSK = 0
#>>> import iso8601
#>>> iso8601.parse_date(utcnow().isoformat())
def splitLog(text):
    global logSK
    try :
        logList     = regex.match(text).groups()
        
        logSK       = logSK + 1
        hostname    = logList[0]
        logname     = logList[1]
        username    = logList[2]
        reqTime     = datetime.strptime(logList[3], "[%d/%b/%Y:%H:%M:%S %z]")
        reqTimeStr  = logList[3]
        firstLine   = logList[4]
        finalStatus = int(logList[5] if logList[5].isnumeric() else '0')
        reqSize     = int(logList[6] if logList[6].isnumeric() else '0')
        line        = text
        
        return (logSK, hostname, logname, username, reqTime, reqTimeStr, firstLine, finalStatus, reqSize, line)
    
    except:
        return (logSK, "ERROR","Error while processing line "+str(logSK), text)
    

Now we can use the Spark Context we created to load the files into RDDs ([Resilient Distributed Datasets](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)). This will load the files by lines.

In [47]:
linesJul = sc.textFile(jul95)

And merge two RDDs into one as all the operations will be the same for the both.

In [48]:
lines = linesJul

Now is where the magic happens. First we use map function to run trought all lines, with we use the flatMap function it will ignore lines taking all the lines together as a single block of characters. Let's cache it in memory as we'll use it a lot.

In [49]:
linesMapped   = lines.map(lambda a: splitLog(a))

In [50]:
linesSplited  = linesMapped.filter(lambda b: len(b) == 10)
linesRejected = linesMapped.filter(lambda b: len(b) < 10)

In [51]:
print("Splited: ", linesSplited.count())
linesSplited.toDF().limit(3).toPandas()

Splited:  1891714


Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8,_9,_10
0,1,199.72.81.55,-,-,1995-07-01 01:00:01,[01/Jul/1995:00:00:01 -0400],"""GET /history/apollo/ HTTP/1.0""",200,6245,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...
1,2,unicomp6.unicomp.net,-,-,1995-07-01 01:00:06,[01/Jul/1995:00:00:06 -0400],"""GET /shuttle/countdown/ HTTP/1.0""",200,3985,unicomp6.unicomp.net - - [01/Jul/1995:00:00:06...
2,3,199.120.110.21,-,-,1995-07-01 01:00:09,[01/Jul/1995:00:00:09 -0400],"""GET /shuttle/missions/sts-73/mission-sts-73.h...",200,4085,199.120.110.21 - - [01/Jul/1995:00:00:09 -0400...


And let's see what are the lines that couldn't be processed.

In [52]:
print("Rejected: ", linesRejected.count())
linesRejected.toDF().toPandas()

Rejected:  1


Unnamed: 0,_1,_2,_3,_4
0,36513,ERROR,Error while processing line 36513,alyssa.p


Now we'll create a [Dataframe](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes) to explore better the data. Dataframes give us the SQL functionality to explore data. 

As we will use this dataframe for all analysis, we can cache it to make consultations faster

In [53]:
logDF = spark.createDataFrame(linesSplited, schema= logSchema)

And make it pretty here. To not get an Out of Memory error, we can limit the dataset before converting it to Pandas dataset.

In [54]:
logDF.limit(3).toPandas()


Unnamed: 0,logId,hostname,logname,username,reqTime,reqTimeStr,firstLine,finalStatus,reqSize,line
0,1,199.72.81.55,-,-,1995-07-01 01:00:01,[01/Jul/1995:00:00:01 -0400],"""GET /history/apollo/ HTTP/1.0""",200,6245,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...
1,2,unicomp6.unicomp.net,-,-,1995-07-01 01:00:06,[01/Jul/1995:00:00:06 -0400],"""GET /shuttle/countdown/ HTTP/1.0""",200,3985,unicomp6.unicomp.net - - [01/Jul/1995:00:00:06...
2,3,199.120.110.21,-,-,1995-07-01 01:00:09,[01/Jul/1995:00:00:09 -0400],"""GET /shuttle/missions/sts-73/mission-sts-73.h...",200,4085,199.120.110.21 - - [01/Jul/1995:00:00:09 -0400...


In [55]:
logDF.schema

StructType(List(StructField(logId,IntegerType,true),StructField(hostname,StringType,true),StructField(logname,StringType,true),StructField(username,StringType,true),StructField(reqTime,TimestampType,true),StructField(reqTimeStr,StringType,true),StructField(firstLine,StringType,true),StructField(finalStatus,IntegerType,true),StructField(reqSize,IntegerType,true),StructField(line,StringType,true)))

In [56]:
logDF2 = logDF.withColumn("df2Plus1",expr('substring_index(substring_index(firstLine, " ", -2), " ", 1)')).withColumn("df2Plus2",expr('substring(reqTimeStr, 5, 3)')).limit(5)
logDF1 = logDF.limit(5)

In [57]:
logDF1 = logDF1.withColumn("diffType",expr('substring(reqTimeStr, 2, 2)'))
logDF2 = logDF2.withColumn("diffType",expr('substring(reqTimeStr, 2, 2)').cast("int"))

In [58]:
logDF1 = logDF1.withColumn("df1Plus1",expr('substring(reqTimeStr, 2, 2)'))

In [59]:
log1Schema = logDF.schema
log2Schema = logDF2.schema
print("log1: %s\n\nlog2:%s" % (log1Schema, log2Schema))

log1: StructType(List(StructField(logId,IntegerType,true),StructField(hostname,StringType,true),StructField(logname,StringType,true),StructField(username,StringType,true),StructField(reqTime,TimestampType,true),StructField(reqTimeStr,StringType,true),StructField(firstLine,StringType,true),StructField(finalStatus,IntegerType,true),StructField(reqSize,IntegerType,true),StructField(line,StringType,true)))

log2:StructType(List(StructField(logId,IntegerType,true),StructField(hostname,StringType,true),StructField(logname,StringType,true),StructField(username,StringType,true),StructField(reqTime,TimestampType,true),StructField(reqTimeStr,StringType,true),StructField(firstLine,StringType,true),StructField(finalStatus,IntegerType,true),StructField(reqSize,IntegerType,true),StructField(line,StringType,true),StructField(df2Plus1,StringType,true),StructField(df2Plus2,StringType,true),StructField(diffType,IntegerType,true)))


In [60]:
log1Schema.fields

[StructField(logId,IntegerType,true),
 StructField(hostname,StringType,true),
 StructField(logname,StringType,true),
 StructField(username,StringType,true),
 StructField(reqTime,TimestampType,true),
 StructField(reqTimeStr,StringType,true),
 StructField(firstLine,StringType,true),
 StructField(finalStatus,IntegerType,true),
 StructField(reqSize,IntegerType,true),
 StructField(line,StringType,true)]

In [61]:
log2Schema.fields

[StructField(logId,IntegerType,true),
 StructField(hostname,StringType,true),
 StructField(logname,StringType,true),
 StructField(username,StringType,true),
 StructField(reqTime,TimestampType,true),
 StructField(reqTimeStr,StringType,true),
 StructField(firstLine,StringType,true),
 StructField(finalStatus,IntegerType,true),
 StructField(reqSize,IntegerType,true),
 StructField(line,StringType,true),
 StructField(df2Plus1,StringType,true),
 StructField(df2Plus2,StringType,true),
 StructField(diffType,IntegerType,true)]

In [62]:
# List Comprehension 
# ls1 = [column for column in logDF1.schema.fields]
# print("1-> ",ls1)
# ls2 = [column for column in logDF2.schema.fields]
# print("2-> ",ls2)

In [63]:
logDF1.limit(1).toPandas()

Unnamed: 0,logId,hostname,logname,username,reqTime,reqTimeStr,firstLine,finalStatus,reqSize,line,diffType,df1Plus1
0,1,199.72.81.55,-,-,1995-07-01 01:00:01,[01/Jul/1995:00:00:01 -0400],"""GET /history/apollo/ HTTP/1.0""",200,6245,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...,1,1


In [64]:
logDF2.limit(1).toPandas()

Unnamed: 0,logId,hostname,logname,username,reqTime,reqTimeStr,firstLine,finalStatus,reqSize,line,df2Plus1,df2Plus2,diffType
0,1,199.72.81.55,-,-,1995-07-01 01:00:01,[01/Jul/1995:00:00:01 -0400],"""GET /history/apollo/ HTTP/1.0""",200,6245,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...,/history/apollo/,Jul,1


In [65]:
colNew = []
colMissing = []
colHas = []

colMissing = [column for column in [column for column in logDF1.schema.fields] 
           if column.name not in [column.name for column in logDF2.schema.fields]]

colNew = [column for column in [column for column in logDF2.schema.fields] 
          if column.name not in [column.name for column in logDF1.schema.fields]]

colHas = [column for column in [column for column in logDF2.schema.fields] 
          if column.name in [column.name for column in logDF1.schema.fields]]
print("Missing columns :%s\n\nNew columns: %s\n\nPresent columns: %s" % (colMissing, colNew, colHas)) 

Missing columns :[StructField(df1Plus1,StringType,true)]

New columns: [StructField(df2Plus1,StringType,true), StructField(df2Plus2,StringType,true)]

Present columns: [StructField(logId,IntegerType,true), StructField(hostname,StringType,true), StructField(logname,StringType,true), StructField(username,StringType,true), StructField(reqTime,TimestampType,true), StructField(reqTimeStr,StringType,true), StructField(firstLine,StringType,true), StructField(finalStatus,IntegerType,true), StructField(reqSize,IntegerType,true), StructField(line,StringType,true), StructField(diffType,IntegerType,true)]


In [66]:
colAll = []
colAll = [column for column in [column for column in logDF2.schema.fields] 
          if column.name in [column.name for column in logDF1.schema.fields]]
print('\n0: size:%s fields ->\n' % len(colAll), colAll)
colAll.extend(colNew)
print('\n1: size:%s with colNew fields ->\n' % len(colAll), colAll)
colAll.extend(colMissing)
print('\n2: size:%s with colNull fields ->\n' % len(colAll), colAll)


0: size:11 fields ->
 [StructField(logId,IntegerType,true), StructField(hostname,StringType,true), StructField(logname,StringType,true), StructField(username,StringType,true), StructField(reqTime,TimestampType,true), StructField(reqTimeStr,StringType,true), StructField(firstLine,StringType,true), StructField(finalStatus,IntegerType,true), StructField(reqSize,IntegerType,true), StructField(line,StringType,true), StructField(diffType,IntegerType,true)]

1: size:13 with colNew fields ->
 [StructField(logId,IntegerType,true), StructField(hostname,StringType,true), StructField(logname,StringType,true), StructField(username,StringType,true), StructField(reqTime,TimestampType,true), StructField(reqTimeStr,StringType,true), StructField(firstLine,StringType,true), StructField(finalStatus,IntegerType,true), StructField(reqSize,IntegerType,true), StructField(line,StringType,true), StructField(diffType,IntegerType,true), StructField(df2Plus1,StringType,true), StructField(df2Plus2,StringType,true)

In [67]:
colSelect = []

colSelect = [column for column in [column for column in logDF2.schema.fields] 
          if column.name in [column.name for column in logDF1.schema.fields]]
colSelect.extend(colNew)
print(colSelect)
logDF2.select([column.name for column in colSelect]).limit(10).toPandas()

[StructField(logId,IntegerType,true), StructField(hostname,StringType,true), StructField(logname,StringType,true), StructField(username,StringType,true), StructField(reqTime,TimestampType,true), StructField(reqTimeStr,StringType,true), StructField(firstLine,StringType,true), StructField(finalStatus,IntegerType,true), StructField(reqSize,IntegerType,true), StructField(line,StringType,true), StructField(diffType,IntegerType,true), StructField(df2Plus1,StringType,true), StructField(df2Plus2,StringType,true)]


Unnamed: 0,logId,hostname,logname,username,reqTime,reqTimeStr,firstLine,finalStatus,reqSize,line,diffType,df2Plus1,df2Plus2
0,1,199.72.81.55,-,-,1995-07-01 01:00:01,[01/Jul/1995:00:00:01 -0400],"""GET /history/apollo/ HTTP/1.0""",200,6245,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...,1,/history/apollo/,Jul
1,2,unicomp6.unicomp.net,-,-,1995-07-01 01:00:06,[01/Jul/1995:00:00:06 -0400],"""GET /shuttle/countdown/ HTTP/1.0""",200,3985,unicomp6.unicomp.net - - [01/Jul/1995:00:00:06...,1,/shuttle/countdown/,Jul
2,3,199.120.110.21,-,-,1995-07-01 01:00:09,[01/Jul/1995:00:00:09 -0400],"""GET /shuttle/missions/sts-73/mission-sts-73.h...",200,4085,199.120.110.21 - - [01/Jul/1995:00:00:09 -0400...,1,/shuttle/missions/sts-73/mission-sts-73.html,Jul
3,4,burger.letters.com,-,-,1995-07-01 01:00:11,[01/Jul/1995:00:00:11 -0400],"""GET /shuttle/countdown/liftoff.html HTTP/1.0""",304,0,burger.letters.com - - [01/Jul/1995:00:00:11 -...,1,/shuttle/countdown/liftoff.html,Jul
4,5,199.120.110.21,-,-,1995-07-01 01:00:11,[01/Jul/1995:00:00:11 -0400],"""GET /shuttle/missions/sts-73/sts-73-patch-sma...",200,4179,199.120.110.21 - - [01/Jul/1995:00:00:11 -0400...,1,/shuttle/missions/sts-73/sts-73-patch-small.gif,Jul


In [68]:
logDF2.select([column.name for column in colNew]).limit(10).toPandas()

Unnamed: 0,df2Plus1,df2Plus2
0,/history/apollo/,Jul
1,/shuttle/countdown/,Jul
2,/shuttle/missions/sts-73/mission-sts-73.html,Jul
3,/shuttle/countdown/liftoff.html,Jul
4,/shuttle/missions/sts-73/sts-73-patch-small.gif,Jul


In [69]:
colsToAlterHiveTable = [column for column in [column for column in logDF2.schema.fields] 
          if column.name not in [column.name for column in logDF1.schema.fields]]
colsToAppendOnDataframe = [column for column in [column for column in logDF1.schema.fields] 
           if column.name not in [column.name for column in logDF2.schema.fields]]
colsAlredyHave = logDF2.schema.fields

In [70]:
tempDF = logDF2.limit(100)

colsAlredyHave.extend(colsToAppendOnDataframe)
final_struc = StructType(fields=colsAlredyHave)
for col in colsToAppendOnDataframe:
    tempDF = tempDF.withColumn(col.name, lit(None).cast(col.dataType))
tempDF = tempDF.select([column.name for column in colsAlredyHave])
rightDF = spark.createDataFrame(tempDF.rdd, schema= final_struc)
rightDF.limit(1).toPandas()

Unnamed: 0,logId,hostname,logname,username,reqTime,reqTimeStr,firstLine,finalStatus,reqSize,line,df2Plus1,df2Plus2,diffType,df1Plus1
0,1,199.72.81.55,-,-,1995-07-01 01:00:01,[01/Jul/1995:00:00:01 -0400],"""GET /history/apollo/ HTTP/1.0""",200,6245,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...,/history/apollo/,Jul,1,


In [71]:
fieldsInfo = ", ".join(["%s %s" % (col.name, col.dataType.typeName().upper()) for col in colsToAlterHiveTable])
#query = "ALTER TABLE employee ADD COLUMNS (%s);" % fields
tableName="datalogger"
query = "ALTER TABLE %(tableName)s ADD COLUMNS (%(fields)s);" % {"fields":fieldsInfo, "tableName":tableName}
print("\n Query result:",query)


 Query result: ALTER TABLE datalogger ADD COLUMNS (df2Plus1 STRING, df2Plus2 STRING);


In [72]:
# 
def dataDriftMangerForHive(sourceDF, targetTableName, cursor):
    
    # Get hive's table as dataframe using Spark SQL
    #hiveDF = spark.sql("Select * from " + targetTableName)
    hiveDF = logDF1
    
    # Get the hive's table fields in right order
    fieldsInHivesRightOrder = hiveDF.schema.fields
    # Get the fields that sources dataframe already have
    fieldsAlredyHave = sourceDF.schema.fields
    
    # Get the fields that source dataframe has the more than the hive's table dataframe.
    fieldsToAddToHiveTable = [field for field in [field for field in fieldsAlredyHave] 
          if field.name not in [field.name for field in fieldsInHivesRightOrder]]
    # Get the fields that hive's table dataframe has the more than the source dataframe.
    fieldsToAppendOnDataframe = [field for field in [field for field in fieldsInHivesRightOrder] 
           if field.name not in [field.name for field in fieldsAlredyHave]]
    
    
    
    # Alter the table structure if neeed.
    if fieldsToAddToHiveTable:
        fieldsInfo = ", ".join(["%s %s" % (field.name, field.dataType.typeName().upper()) for field in fieldsToAddToHiveTable])
        query = "ALTER TABLE %(targetTableName)s ADD COLUMNS (%(fields)s);" % {"fields":fieldsInfo, "targetTableName":targetTableName}
        
        #Need  query executor using the right JDBC, not Spark's SQL JDBC
        try:
            #cursor.execute(query)
            print("Executing: '%s'" % query)
        except SQLException as s:
            return None
        # Get the hive's table fields in new right order
        # fieldsInHivesRightOrder = spark.sql("Select * from " + targetTableName).schema.fields
        fieldsInHivesRightOrder = hiveDF.schema.fields
    # Append the fields that source dataframe doesn't have

    for field in fieldsToAppendOnDataframe:
        sourceDF = sourceDF.withColumn(field.name, lit(None).cast(field.dataType))

    # Create a hive ordered
    sourceDF = sourceDF.select([field.name for field in fieldsInHivesRightOrder])

    # Create a StructType for new Schema
    finalStruc = StructType(fields=fieldsInHivesRightOrder)
    
    # Return a new right ordered dataframe
    return spark.createDataFrame(sourceDF.rdd, schema = finalStruc)

ret = dataDriftMangerForHive(logDF2, "datalogger", None)
ret.limit(2).toPandas()

Executing: 'ALTER TABLE datalogger ADD COLUMNS (df2Plus1 STRING, df2Plus2 STRING);'


AnalysisException: "cannot resolve '`df1Plus1`' given input columns: [diffType, reqSize, logname, reqTime, df2Plus2, hostname, finalStatus, reqTimeStr, logId, line, df2Plus1, username, firstLine];;\n'Project [logId#212, hostname#213, logname#214, username#215, reqTime#216, reqTimeStr#217, firstLine#218, finalStatus#219, reqSize#220, line#221, diffType#269, 'df1Plus1]\n+- Project [logId#212, hostname#213, logname#214, username#215, reqTime#216, reqTimeStr#217, firstLine#218, finalStatus#219, reqSize#220, line#221, df2Plus1#232, df2Plus2#244, cast(substring(reqTimeStr#217, 2, 2) as int) AS diffType#269]\n   +- GlobalLimit 5\n      +- LocalLimit 5\n         +- Project [logId#212, hostname#213, logname#214, username#215, reqTime#216, reqTimeStr#217, firstLine#218, finalStatus#219, reqSize#220, line#221, df2Plus1#232, substring(reqTimeStr#217, 5, 3) AS df2Plus2#244]\n            +- Project [logId#212, hostname#213, logname#214, username#215, reqTime#216, reqTimeStr#217, firstLine#218, finalStatus#219, reqSize#220, line#221, substring_index(substring_index(firstLine#218,  , -2),  , 1) AS df2Plus1#232]\n               +- LogicalRDD [logId#212, hostname#213, logname#214, username#215, reqTime#216, reqTimeStr#217, firstLine#218, finalStatus#219, reqSize#220, line#221], false\n"