In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel
import util
from config import *
import logging
import os

In [2]:
logging.basicConfig(level=logging.INFO)
paths:Paths=Paths(
    dataLake="../DataLake/"
    ,srcSearches="../searches/"
    ,srcVisitors="../visitors/"
    ,rawSearches="../DataLake/raw/searches"
    ,rawVisitors="../DataLake/raw/visitors"
    ,ezSearches="../DataLake/ez/searches"
    ,ezVisitors="../DataLake/ez/visitors"
    ,archive="../archive/"
    ,archiveSearches="../archive/searches"
    ,archiveVisitors="../archive/visitors"
)
util.rawZoneSetup(paths)

Task 1: Data Ingestion

In [3]:
spark=SparkSession\
.builder\
.appName("test")\
.getOrCreate()

spark.sparkContext.setCheckpointDir("../sparkCache")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/09/09 07:40:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
def dataIngestion(srcFolder,targetFolder,archiveFolder):
    for f in os.listdir(srcFolder):
        srcFile=f"{srcFolder}/{f}"
        if (".json" not in f):os.remove(srcFile)
        else:
            ts=util.getTsFromFileName(f)
            targetPath=f"{targetFolder}/{ts}/"
            spark.read.json(f"{srcFolder}/{f}").coalesce(1).write.mode("append").options(header="True",compression="snappy").parquet(targetPath)
            os.rename(srcFile,f"{archiveFolder}/{f}")
            # logging.info(f"Completed dataIngestion {srcFile}")


dataIngestion(paths.srcSearches,paths.rawSearches,paths.archiveSearches)
dataIngestion(paths.srcVisitors,paths.rawVisitors,paths.archiveVisitors)

                                                                                

Task 2: Preprocessing

In [5]:
def cleanVisitor(df:DataFrame)->DataFrame:
    df=df\
    .withColumn("hits_avg",df["hits_avg"].cast(IntegerType()))\
    .withColumn("logged_in",df["logged_in"].cast(BooleanType()))\
    .withColumn("visit_start", udateHandler(df.visit_start) )\
    .withColumn("visits",df.visits.cast(IntegerType()))\
    .withColumn("visitor_id",trim(df.visitor_id.cast(StringType())))

    df=df.withColumn("visit_start", to_timestamp(df.visit_start, "yyyy-MM-dd HH:mm:ss"))
    df=df.withColumn("date", date_format(df.visit_start,"yyyy-MM-dd").cast(DateType()))\
    .na.fill("na",["visitor_id"])
    return df

def cleanSearches(df:DataFrame)->DataFrame:
    df= df\
    .withColumn("date_time",to_timestamp(df.date_time, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))\
    .withColumn("visitor_id",trim(df.visitor_id.cast(StringType())))
    
    df=df\
    .withColumn("date", date_format(df.date_time,"yyyy-MM-dd").cast(DateType()))\
    .na.fill("na",["visitor_id"])
    
    return df

def dateHandler(dateStr:str)->str:
    year=dateStr.split("-")[0]
    if(len(year)==2):
        dateStr="20"+dateStr
    return dateStr

udateHandler=udf(dateHandler)

In [6]:
rawVisitorDF= spark.read.options(header="True").options(inferSchema="True").parquet(f"{paths.rawVisitors}/*").cache()
logging.info(f" rawVisitorPartitions = {rawVisitorDF.rdd.getNumPartitions()} , rawVisitorDF.count={rawVisitorDF.count()} ")
cleanVisitorDF=rawVisitorDF.transform(cleanVisitor)
rawVisitorDF.unpersist()

rawSearchesDF= spark.read.options(header="True").options(inferSchema="True").parquet(f"{paths.rawSearches}/*").cache()
logging.info(f" rawSearchesPartitions={rawSearchesDF.rdd.getNumPartitions()} , rawSearchesPartitions ={rawSearchesDF.count()}")
cleanSearchesDF=rawSearchesDF.transform(cleanSearches)
rawSearchesDF.unpersist()

INFO:root: rawVisitorPartitions = 1 , rawVisitorDF.count=9999 
INFO:root: rawSearchesPartitions=1 , rawSearchesPartitions =13200


DataFrame[date_time: string, destination_out: string, destination_ret: string, flight_date_inbound: string, flight_date_outbound: string, origin_out: string, origin_ret: string, segments: bigint, visitor_id: double]

VisitorDimension

In [7]:
visitorDimension=cleanVisitorDF.select("visitor_id").distinct().withColumn("visitorkey",monotonically_increasing_id())
lastKey=visitorDimension.select(max(visitorDimension.visitorkey).alias("max")).first()["max"]

#Getting new IDs in Searches which is not there in Visitor 
visitorDimension=cleanSearchesDF\
.select("visitor_id")\
.distinct()\
.join(visitorDimension,["visitor_id"],"left_anti")\
.select("visitor_id")\
.withColumn("visitorkey",monotonically_increasing_id()+lastKey+1)\
.union(visitorDimension)\
.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

In [8]:
factVisitor=cleanVisitorDF.join(visitorDimension,["visitor_id"],"left_outer").persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
factSearches=cleanSearchesDF.join(visitorDimension,["visitor_id"],"left_outer").persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

# factVisitor=factVisitor.repartitionByRange(20,factVisitor.visitorkey)
# factSearches=factSearches.repartitionByRange(20,factSearches.visitorkey)

factVisitor.checkpoint()
factSearches.checkpoint()

                                                                                

DataFrame[visitor_id: string, date_time: timestamp, destination_out: string, destination_ret: string, flight_date_inbound: string, flight_date_outbound: string, origin_out: string, origin_ret: string, segments: bigint, date: date, visitorkey: bigint]

PeriodDimension (Date)

In [9]:
peridDimension=cleanVisitorDF.select("date").distinct().withColumn("datekey",monotonically_increasing_id())
maxPeriodKey=peridDimension.select(max(peridDimension.datekey).alias("max")).first()["max"]

peridDimension=cleanSearchesDF\
.select("date")\
.distinct()\
.join(peridDimension,["date"],"left_anti")\
.select("date")\
.distinct()\
.withColumn("datekey",maxPeriodKey+monotonically_increasing_id()+1)\
.union(peridDimension)

In [10]:
factVisitor=factVisitor.join(peridDimension,["date"],"left_outer").persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
factSearches=factSearches.join(peridDimension,["date"],"left_outer").persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

factVisitor.checkpoint()
factSearches.checkpoint()

DataFrame[date: date, visitor_id: string, date_time: timestamp, destination_out: string, destination_ret: string, flight_date_inbound: string, flight_date_outbound: string, origin_out: string, origin_ret: string, segments: bigint, visitorkey: bigint, datekey: bigint]

Validations

In [11]:
# Validation to Verify Count source and intermitten result
assert(rawVisitorDF.count() == factVisitor.count() )
assert(rawSearchesDF.count() == factSearches.count())

In [None]:
Task3: Reports

In [12]:
# Without Period Dim
# factVisitorGrouped=factVisitor.groupBy("visitorkey","date",factVisitor.datekey).agg(max("visit_start").alias("visit_start")).cache()
# factVisitorExtended=factVisitorGrouped\
#                         .join(factVisitor,["visitorkey","visit_start"])\
#                         .select("visitorkey","visit_start",factVisitorGrouped.date,factVisitorGrouped.datekey,"country","region")\
#                         .withColumnRenamed("visit_start","date_time")


# With Period Dimesion
factVisitorGrouped=factVisitor.groupBy("visitorkey",factVisitor.datekey).agg(max("visit_start").alias("visit_start")).cache()
factVisitorExtended=factVisitorGrouped\
                        .join(factVisitor,["visitorkey","visit_start"])\
                        .select("visitorkey","visit_start",factVisitorGrouped.datekey,"country","region")\
                        .withColumnRenamed("visit_start","date_time")





Final Result

In [13]:
## Without Period Dim
# result = factSearches\
# .join(factVisitorExtended,["visitorkey","date"],"left_outer")\
# .select("country","region","visitorkey","date")\
# .groupBy("date","country","region")\
# .agg(count("*").alias("count")).cache()

## With Period Dim
result = factSearches\
.join(factVisitorExtended,["visitorkey","datekey"],"left_outer")\
.select("country","region","visitorkey",factSearches.date)\
.groupBy("date","country","region")\
.agg(count("*").alias("count")).cache()

In [14]:
sampleDate="(cast('2021-03-05' as date),cast('2021-04-12' as date),cast('2021-01-27' as date),cast('2021-05-02' as date),cast('2021-05-08' as date) )"
result.filter(f"date in {sampleDate}").show()
# result.filter(f"date = cast('2021-04-12' as date)").show()


                                                                                

+----------+-------+------+-----+
|      date|country|region|count|
+----------+-------+------+-----+
|2021-01-27|   null|  null|  109|
|2021-03-05|   null|  null|   89|
|2021-05-08|   null|  null|  113|
|2021-05-02|   null|  null|  108|
|2021-04-12|   null|  null|   99|
+----------+-------+------+-----+



In [None]:
# country and region not Found  - but should be esp , co
# cleanVisitorDF.filter(f"date = cast('2021-04-12' as date)").show()