# Import All Usefule Libraries and Modules

In [2]:
from pyspark.sql import SparkSession
#import pyspark.pandas as pd
#import pandas as ps
from pyspark.sql.functions import trim,upper,lower,to_date,to_timestamp,transform,split,col,substring
from pyspark.sql.functions import dayofmonth,month,hour
from pyspark.sql.types import StringType,IntegerType
from pyspark.sql.functions import sum,avg,max,count
from pyspark.sql.functions import udf,col
from fastavro import reader
from pathlib import Path
import logging


log = logging.getLogger('test')
log.setLevel(logging.DEBUG)

def avro_reader(filename):
    with open(filename, 'rb') as fo:
        avro_reader = reader(fo)
        records = [r for r in avro_reader]
        return records


sc=SparkSession\
           .builder\
           .appName("SparkSQLDenoramlisedApp")\
           .getOrCreate()

# Input & Output Path Definition

In [30]:
FILE_TO_TRANSFORM = "kcc.parquet"

STAGING_PATH = Path.cwd().parent.joinpath("STAGING_LAKE",FILE_TO_TRANSFORM)
#print(STAGING_PATH)

OUTPUT = Path.cwd().parent.joinpath("DOWNSTREAM_READY_EXTRACTS")

PATH_TO_BLOCKS_DF = Path.cwd().parent.joinpath('ARCHIVE','blocks_coordinates.csv')

PATH_TO_CALENDER = Path.cwd().parent.joinpath('ARCHIVE','calender_range_20230601_20231231.avro')


if STAGING_PATH.exists():
    print("staging path exists")
else:
    log.warning("Staging path not exists,need to create")
    
    


staging path exists


# Reading all required datasets processed after extracts generation

In [47]:
kccDF = sc.read.parquet(str(STAGING_PATH)+"/kcc.parquet").select('Sector','Category','Crops','QueryType','StateName','DistrictName','BlockName','QueryText','kccEng','createdTime','convertedDate','createdHour')
kccDF.printSchema()

kccDF.show(3)

root
 |-- Sector: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Crops: string (nullable = true)
 |-- QueryType: string (nullable = true)
 |-- StateName: string (nullable = true)
 |-- DistrictName: string (nullable = true)
 |-- BlockName: string (nullable = true)
 |-- QueryText: string (nullable = true)
 |-- kccEng: string (nullable = true)
 |-- createdTime: string (nullable = true)
 |-- convertedDate: date (nullable = true)
 |-- createdHour: integer (nullable = true)

+-----------+--------+----------+------------------+--------------+-------------+----------+--------------------+--------------------+-----------+-------------+-----------+
|     Sector|Category|     Crops|         QueryType|     StateName| DistrictName| BlockName|           QueryText|              kccEng|createdTime|convertedDate|createdHour|
+-----------+--------+----------+------------------+--------------+-------------+----------+--------------------+--------------------+-----------+-----------

# Checking for any Null Values in KccDF

In [48]:
#Checking kccDF if any timestamp is null ,Block if null ,if  Crop, QueryText is null,
null_count = kccDF.where(kccDF.createdTime.isNull() | kccDF.BlockName.isNull() | kccDF.Crops.isNull() | kccDF.QueryText.isNull()).count()

if null_count == 0:
    print("No Null Values in either Data Entities")
else:
    print("Null Values found ")

No Null Values in either Data Entities


# Reading Blocks Cordinates Data with BlockName should be unique

In [28]:

blockDF = sc.read.format('csv').option('inferSchema',True).option('header',True).load(str(PATH_TO_BLOCKS_DF))

#Renaming BlockName to avoid ambiguous conlficts between kccDF & blockDF 
blockDF = blockDF.withColumnRenamed("BlockName","blocks")
#removing duplicates
blockDF_unique = blockDF.dropDuplicates(['blocks'])

#avoid if file appended with header:true while generating blocks coordinates
blockDFValid = blockDF_unique.where('blocks !="BlockName"')
#blockDF.printSchema()

#blockDFValid.count()
blockDF.show(5)

+------------+------------------+-----------------+
|      blocks|          latitude|        longitude|
+------------+------------------+-----------------+
|  PEDDAPURAM|       17.09813605|82.13236972772236|
| ATREYAPURAM|16.853765199999998|81.78261498987507|
|  RANGAMPETA|       17.05134845|81.99740312853817|
|MUMMIDIVARAM|16.646411450000002|82.11641830975093|
|    KAJULURU|        16.7973104|82.15952482953708|
+------------+------------------+-----------------+
only showing top 5 rows



# Testing Data Validity For block coordinates DF:blockDF

In [106]:

#check if any block name is null
#blockDFValid.where(blockDFValid.blocks.isNull()).collect()
from pyspark_assert import assert_frame_equal

try:
    assert_frame_equal(blockDF, blockDFValid)
except Exception as e:
    logging.warning('Could contain duplicate values or heading appended multiple Times')
    logging.warning(e)
    
finally:
    log.info('\n blockDFValid is cleaned ')

	Expected : 56
	Actual   : 113
INFO:test:
 blockDFValid is cleaned 


# Reading Calender Data for range inclusive of kccDF

In [104]:
calender_dic = avro_reader(str(PATH_TO_CALENDER))
calenderDF = sc.createDataFrame(calender_dic).drop_duplicates(['date'])
calenderDF.printSchema()
log.debug(calenderDF.count())

root
 |-- MonthName: string (nullable = true)
 |-- date: string (nullable = true)
 |-- day: long (nullable = true)
 |-- month: long (nullable = true)
 |-- quarter: long (nullable = true)
 |-- weekName: string (nullable = true)
 |-- weekday: long (nullable = true)



DEBUG:test:214


In [105]:
calenderDF = calenderDF.withColumn("datePart",substring(col("date"),1,10))
calenderDF.show(4)

+---------+-------------------+---+-----+-------+---------+-------+----------+
|MonthName|               date|day|month|quarter| weekName|weekday|  datePart|
+---------+-------------------+---+-----+-------+---------+-------+----------+
|     June|2023-06-14T00:00:00| 14|    6|      2|Wednesday|      2|2023-06-14|
|September|2023-09-22T00:00:00| 22|    9|      3|   Friday|      4|2023-09-22|
|     June|2023-06-27T00:00:00| 27|    6|      2|  Tuesday|      1|2023-06-27|
|September|2023-09-16T00:00:00| 16|    9|      3| Saturday|      5|2023-09-16|
+---------+-------------------+---+-----+-------+---------+-------+----------+
only showing top 4 rows



# Merging/Joining all imported DataSets with Star Schema Data Modeling concept.

In [126]:
mergedKccDF = kccDF.join(blockDFValid,kccDF.BlockName ==  lower(blockDFValid.blocks),"left")\
        .join(calenderDF,kccDF.convertedDate.cast(StringType()) == calenderDF.datePart,"left")

mergedKccDF.printSchema()


root
 |-- Sector: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Crops: string (nullable = true)
 |-- QueryType: string (nullable = true)
 |-- StateName: string (nullable = true)
 |-- DistrictName: string (nullable = true)
 |-- BlockName: string (nullable = true)
 |-- QueryText: string (nullable = true)
 |-- kccEng: string (nullable = true)
 |-- createdTime: string (nullable = true)
 |-- convertedDate: date (nullable = true)
 |-- createdHour: integer (nullable = true)
 |-- blocks: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- MonthName: string (nullable = true)
 |-- date: string (nullable = true)
 |-- day: long (nullable = true)
 |-- month: long (nullable = true)
 |-- quarter: long (nullable = true)
 |-- weekName: string (nullable = true)
 |-- weekday: long (nullable = true)
 |-- datePart: string (nullable = true)



In [138]:
kccMergedDF = mergedKccDF.select('Sector','Category','Crops','QueryType','QueryText','kccEng','StateName','DistrictName','blocks','latitude','longitude','convertedDate','day','month','MonthName','quarter','weekName','weekDay','year')

kccMergedDF.write.partitionBy('MonthName','Sector').mode('errorifexists').parquet('kccFinalDF')

# Reporting for Data Quality and EDA

In [139]:
kccMergedDF.createOrReplaceTempView("kccModeltb")

In [141]:
sc.catalog.listTables()

[Table(name='kccmodeltb', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [149]:
Monthly_created_Query_Rep = kccMergedDF.groupby('MonthName')\
        .agg({'QueryText':'count','Crops':'count'})\
        .select('MonthName',col('count(Crops)')\
        .alias('Total_Created_Query'))\
        .sort(col('Total_Created_Query').desc())\
        #.show()

+---------+-------------------+
|MonthName|Total_Created_Query|
+---------+-------------------+
| November|                475|
| December|                428|
|  October|                280|
|     July|                215|
|   August|                206|
|     June|                206|
|September|                186|
+---------+-------------------+



In [151]:
SectorQueryReport = kccMergedDF.groupby('MonthName')\
                    .pivot('Sector')\
                    .agg({"MonthName": "count"})

SectorQueryReport.show()

+---------+-----------+----------------+------------+
|MonthName|agriculture|animal husbandry|horticulture|
+---------+-----------+----------------+------------+
|     July|        169|            null|          46|
| November|        425|               5|          45|
|  October|        239|               2|          39|
|   August|        166|               2|          38|
|     June|        173|               2|          31|
| December|        358|               1|          69|
|September|        142|            null|          44|
+---------+-----------+----------------+------------+



In [156]:
kccMergedDF.groupby('MonthName','Crops').agg(count(kccMergedDF.Crops).alias('Query_count')).show()

+---------+----------+-----------+
|MonthName|     Crops|Query_count|
+---------+----------+-----------+
|September|    others|         58|
| November|    tomato|          1|
| November|     grape|          4|
|     June|    bovine|          2|
|   August|betel vine|          1|
|  October|   brinjal|          8|
| November|     maize|         34|
|September|    bhindi|          1|
| December|     mango|          4|
|  October|    others|         47|
|September|    orange|          2|
|  October|    papaya|          2|
| November|    papaya|          1|
|  October|green gram|          2|
|     July|  oil palm|          2|
| December|     paddy|        194|
|  October|       pig|          1|
|  October|    bhindi|          1|
|   August|    papaya|          1|
|     June| carnation|          1|
+---------+----------+-----------+
only showing top 20 rows



In [181]:

sc.sql("with tmp as \
        (select month,MonthName,Crops,count(querytext) as query_count from kccModeltb \
        group by month,MonthName,Crops order by MonthName) \
        select *, Rank() over(partition by MonthName order by query_count desc) top3 from tmp")\
        .filter('top3 < 4').sort(col('month')).show()
#from pyspark.sql import Window

#window = Window.partitionBy("month").orderBy("Query_count")
#kccMergedDF.groupby('MonthName','Crops').agg(count(kccMergedDF.Crops).alias('Query_count')).withColumn("rank", func.sum("id").over(window)).sort("month").show()

+-----+---------+----------+-----------+----+
|month|MonthName|     Crops|query_count|top3|
+-----+---------+----------+-----------+----+
|    6|     June|    others|         79|   1|
|    6|     June|     paddy|         70|   2|
|    6|     June|     maize|          7|   3|
|    7|     July|     paddy|         79|   1|
|    7|     July|    others|         73|   2|
|    7|     July|    cotton|          8|   3|
|    8|   August|     paddy|         84|   1|
|    8|   August|    others|         51|   2|
|    8|   August|black gram|         14|   3|
|    9|September|     paddy|         50|   2|
|    9|September|    others|         58|   1|
|    9|September|black gram|         16|   3|
|   10|  October|     paddy|         97|   1|
|   10|  October|    others|         47|   3|
|   10|  October|black gram|         49|   2|
|   11| November|     maize|         34|   3|
|   11| November|     paddy|        231|   1|
|   11| November|    others|        123|   2|
|   12| December|     paddy|      

#kccMergedDF.createOrReplaceTempView("kccModeltb")

In [95]:
#from pyspark.sql.functions import expr,length
#sc.sql("select convertedDate from emp order by convertedDate limit 3").write.option("header",True).mode("append").csv("DQ_REPORT\ss")

#dd.write.csv(name, format='csv', mode='append', partitionBy=None, **options)

In [101]:
#sc.sql("select min(convertedDate),max(convertedDate),count(distinct(convertedDate)) AS UNIQUE_DAYS from emp")\
#.write.option("header",True).mode("append").csv("DQ_REPORT\CALENDER_REPORT")
#.show()