In [10]:
pip install s3fs

IOStream.flush timed out
Note: you may need to restart the kernel to use updated packages.


In [4]:
import re
import os
# import s3fs
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from general_functions.general_functions import *
import matplotlib.pyplot as plt
import matplotlib.pyplot as pyplt
import numpy as np
from scipy.interpolate import make_interp_spline
import datetime
from scipy.ndimage import gaussian_filter1d

from pyspark.sql.window import Window
from pyspark.sql.functions import *

logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
logger = logging.getLogger('Data_Processing')
logger.info('main.py Script started')

ModuleNotFoundError: No module named 'general_functions'

In [5]:
def generating_publisher_id_with_maximum_queries(spark, destination_path):
    try:
        """
        This function is used to transform the clean data, to get the top-5 publishers. 
        Parameters:
        destination_path: This is the path of clean folder where data is being saved. 
        spark: spark object
        returns: transformed pyspark-dataframe
        """
        df  = spark.read.parquet(destination_path)
        logger.info(f"data loaded successfully from path:{destination_path}")
        df = df.withColumn("file_creation_date", date_format('file_creation_date', "yyyy-MM-dd"))
        window_spec = Window.partitionBy('publisher_id').orderBy("publisher_id")
        x = df.withColumn("total_count_of_click", sum(col('total_clicks')).over(window_spec))
        rank_spec = Window.partitionBy().orderBy(desc(col("total_count_of_click")))
        x = x.withColumn("rank", dense_rank().over(rank_spec))
        x = x.filter(x.rank<=5)
        unique_publisher_id = list(set(x.select("publisher_id").rdd.flatMap(lambda x: x).collect()))
        v = df.filter(df.publisher_id.isin(unique_publisher_id))
        v = v.groupBy("publisher_id", "file_creation_date").agg(sum(col("total_clicks")))
        v = v.sort("publisher_id", "file_creation_date", ascending=True)
        v.show()
        v.printSchema()
        return v
    except Exception as e:
        logger.info(f"Error has been encountered at generating_publisher_id_with_maximum_queries {e}")

def generatring_line_graph_for_top_5_publishers(df,destination_path):
    try:
        """
        This function is used to generate the line_graph and save it graph folder.
        Parameters:
        destination_path[String]: Path of graph folder to save the output line-graph.
        df[Pyspark-dataframe]: Dataframe.
        returns: None
        """
        pyplt.rcParams["figure.figsize"] = (50,15)
        plt.rcParams["figure.autolayout"] = True
        plt.set_loglevel('WARNING')

        df = df.toPandas()
        x_axis = sorted(df['file_creation_date'].drop_duplicates().to_list())
        print(x_axis)
        # fig, ax = plt.subplots(figsize=(5, 2.7), layout='constrained')
        color_schema = ['r','y','g','c','k']
        # unique_publisher_id = sorted(list(set(df.select("publisher_id").rdd.flatMap(lambda x: x).collect())))
        
        unique_publisher_id = sorted(df['publisher_id'].drop_duplicates().to_list())
        for i in range(len(unique_publisher_id)):
            print(unique_publisher_id[i])

            x_axis = np.array(sorted(df[df['publisher_id'] == unique_publisher_id[i]]['file_creation_date'].drop_duplicates().to_list()))

            y_axis = np.array(df[df['publisher_id'] == unique_publisher_id[i]]['sum(total_clicks)'].to_list())//1000

            y_smooth = gaussian_filter1d(y_axis, sigma=1)

            # Plot smooth curve
            plt.plot(x_axis, y_smooth,  f'x-{color_schema[i]}',label=unique_publisher_id[i], linewidth=4)


        plt.xlabel('Date', size = 50, labelpad=38)
        plt.ylabel('Clicks (x 1000)', size = 50, labelpad= 38)
        plt.title('QPS', size = 50, pad = 6)
        plt.xticks(fontsize=42,rotation=45,ha='right')
        plt.yticks(fontsize=42)
        specific_y_ticks = np.arange(0, 1200, 100)

        plt.gca().set_yticks(specific_y_ticks)
        plt.grid(visible = True,axis='y', which='both',color='k', linestyle='-', linewidth=0.6, in_layout=True)
        plt.grid(visible = True,axis='x', which='both',color='k', linestyle='-', linewidth=0.6, in_layout=True)
        plt.legend(prop={'size':50})
        os.makedirs("graph",exist_ok=True)
        plt.savefig(f'{destination_path}/graph/line_graph.png', dpi=300, bbox_inches='tight')
        plt.show()
    except Exception as e:
        logger.info(f"Error has been encountered at generatring_line_graph_for_top_5_publishers {e}")

def apply_transformations(spark,destination_path):
    try:
        """
        This function is used to transform raw data received in Landing folder and save it in clean folder as parquet format, with partitionBy date.
        Parameters:
        destination_path: This is the path of clean folder where data is being saved. 
        spark: spark object
        returns: transformed pyspark-dataframe
        """
        print(destination_path)
        folder_path = []
        # actual_path = destination_path.replace("\\", "/")+'/Landing/click_log/2024/05/'
        actual_path = destination_path
        for outside in os.listdir(actual_path):
            for inside in os.listdir(f'{actual_path}/{outside}'):
                folder_path.append(f'{outside}/{inside}')


        for path in folder_path:
            logger.info(f"currently working on folder:{path}")
            # df = spark.read.option("inferSchema", True).option("mode", "PERMISSIVE").json(f"{actual_path}{path}/")
            spark.conf.set("spark.sql.streaming.schemaInference", True)
            df = (spark.readStream.option("cleanSource","archive").option("sourceArchiveDir", "archive_dir").option("maxFilesPerTrigger", 1).format("json").load("C:/Users/Admin/Downloads/backup/root/stream_landing"))
            df = df.select('*', "ip_geo.*", "query.*").drop("query", "ip_geo")
            df = df.toDF(*get_unique_column_names(df.columns))

            df = df.drop(*get_duplicate_column_names(df))

            df = df.withColumn("real_filepath", input_file_name())

            df = df.withColumn("actual_file" , split(df.real_filepath, '/',limit=-1))
            df = df.withColumn("count_file", size(df.actual_file))
            df = df.withColumn("actual_file" , df.actual_file[col("count_file")-1]).drop("count_file")
            df = df.withColumn("file_creation_date", get_file_generation_date_udf(col("actual_file")))
            df = df.withColumn("file_creation_date", date_format(to_timestamp("file_creation_date", "yyyy-MM-dd HH-mm"), "yyyy-MM-dd HH:mm"))
            publisher_id  = get_publisher_id_column_name(df)
            df = df.na.fill("null")
            # print("this is the column structure", df.columns)
            df = df.withColumnRenamed(publisher_id, "publisher_id")
            df = df.select("publisher_id", "file_creation_date", "actual_file")
            df = df.withColumn("publisher_id", when(length(col("publisher_id")) > 6, regexp_extract(col("publisher_id"), "^(va-\d{3})|^(VA-\d{3})",0)).otherwise(col("publisher_id")))
            
            
            df = df.groupBy("publisher_id", "file_creation_date", "actual_file").agg(count("publisher_id").alias("total_clicks"))
            # # print("this is the window function count", df.count())
            df = df.withColumn("date", split(col("file_creation_date"), " ").getItem(0))
            df = df.withColumn("date", to_timestamp("date", "yyyy-MM-dd"))
            df = df.withColumn("path", lit(path))
            df.printSchema()

            # df.write.partitionBy("date").mode("append").format("parquet").save(str(os.getcwd()).replace("\\", "/")+f'/clean1')
            streaming_df.writeStream.format("console").option("checkpointLocation", "C:/Users/Admin/Downloads/backup/checkpoint/").outputMode("append").start().awaitTermination()
            logger.info(f"successfully saved data of {path} with partiton column date")
        return str(os.getcwd()).replace("\\", "/")+f'/clean1'
    except Exception as e:
        logger.info(f"Error has been encountered at apply_transformations {e}")


In [6]:
# try:
#     if __name__ == "__main__":

#         #public s3 path. 
#         s3_path = 'datasci-assignment/click_log/2024/05/'

#         #Create S3 object to read from public S3-bucket.
#         s3 = s3fs.S3FileSystem(anon =  True)

#         #getting currect working directory to save the files in landing location. 
#         currect_working_directory = os.getcwd().replace("\\", "/")
#         logger.info(f"ingestion of data started from s3-path {s3_path}")

#         #Start INgesting data in Landing folder.
#         destination_path  = ingest_data_from_s3(currect_working_directory, s3, s3_path)
#         logger.info(f"ingestion of data Completed successfully at location {destination_path}")


#         spark = SparkSession.builder.master("local[*]").appName("Batch_procecssing_pipeline_from_s3").config("spark.sql.legacy.timeParserPolicy","LEGACY").getOrCreate()
#         logger.info(f"SparkSession Created Successfully")
        
#         logger.info(f"apply_transformations function started successfully reading data from location : {destination_path}")
#         destination_path = apply_transformations(spark,destination_path)
#         logger.info(f"apply_transformations function completed saved parquet at location: {destination_path}")
#         df = generating_publisher_id_with_maximum_queries(spark, destination_path)
#         logger.info("generating_publisher_id_with_maximum_queries function runned successfully")
#         df.coalesce(1).write.mode("overwrite").csv("top_5_publishers_id_data")
#         logger.info("top-5 publishers_id saved in csv file")
#         generatring_line_graph_for_top_5_publishers(df, os.getcwd())
#         logger.info(f"generatring_line_graph_for_top_5_publishers function completed saved parquet at location: {destination_path}")
            
# except Exception as e:
#         logger.info(f"Error has been encountered at main {e}")

In [3]:
# destination_path = apply_transformations(spark,destination_path)
# while True:
#     print("yes")

logger.info(os.getcwd())

spark = SparkSession.builder.master("local[*]").appName("Batch_procecssing_pipeline_from_s3").config("spark.sql.legacy.timeParserPolicy","LEGACY").getOrCreate()
spark.conf.set("spark.sql.streaming.schemaInference", True)
df = (spark.readStream.option("cleanSource","archive").option("sourceArchiveDir", "./archived/here/").option("maxFilesPerTrigger", 1).format("json").load(f"{os.getcwd()}/landing/"))
df.printSchema()
df = df.select('*', "ip_geo.*", "query.*").drop("query", "ip_geo")
df = df.toDF(*get_unique_column_names(df.columns))

df = df.drop(*get_duplicate_column_names(df))

df = df.withColumn("real_filepath", input_file_name())
df.printSchema()
df = df.withColumn("click_time" ,col("click_time_1").cast("timestamp"))
df.printSchema()
# df.show()
df = df.withColumn("actual_file" , split(df.real_filepath, '/',limit=-1))
df = df.withColumn("count_file", size(df.actual_file))
df = df.withColumn("actual_file" , df.actual_file[col("count_file")-1]).drop("count_file")
df = df.withColumn("file_creation_date", get_file_generation_date_udf(col("actual_file")))
df = df.withColumn("file_creation_date", to_timestamp("file_creation_date", "yyyy-MM-dd HH-mm"))
publisher_id  = get_publisher_id_column_name(df)
df = df.na.fill("null")
# df.printSchema()
# # print("this is the column structure", df.columns)
df = df.withColumnRenamed(publisher_id, "publisher_id")
df = df.select("publisher_id", "file_creation_date", "actual_file","click_time")
df = df.withColumn("publisher_id", when(length(col("publisher_id")) > 6, regexp_extract(col("publisher_id"), "^(va-\d{3})|^(VA-\d{3})",0)).otherwise(col("publisher_id")))

# df.printSchema()
df = df.withWatermark("click_time", "10 minutes").groupBy(window("click_time", "10 minutes"),"publisher_id", "file_creation_date", "actual_file").agg(count("publisher_id").alias("total_clicks"))
# print("this is the window function count", df.count())
df = df.withColumn("date", split(col("file_creation_date"), " ").getItem(0))
df = df.withColumn("date", to_timestamp("date", "yyyy-MM-dd"))
# df = df.withColumn("path", lit(path))
# df.printSchema()

# df.write.partitionBy("date").mode("append").format("parquet").save(str(os.getcwd()).replace("\\", "/")+f'/clean1')
df.writeStream.format("console").option("checkpointLocation", f"{os.getcwd()}/checkpoint/").trigger(processingTime="30 seconds").outputMode("append").start().awaitTermination()
# .option("path", f"{os.getcwd()}/output/")
# .option("path", f"{os.getcwd()}/output/")
# logger.info(f"successfully saved data of {path} with partiton column date")

NameError: name 'logger' is not defined

In [14]:
spark = SparkSession.builder.master("local[*]").appName("Batch_procecssing_pipeline_from_s3").config("spark.sql.legacy.timeParserPolicy","LEGACY").getOrCreate()
spark.conf.set("spark.sql.streaming.schemaInference", True)
df = (spark.readStream.option("cleanSource","archive").option("sourceArchiveDir", "./archived/here/").option("maxFilesPerTrigger", 1).format("json").load("C:/Users/Admin/Downloads/stream_task/stream_landing/"))
df.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- click_time: long (nullable = true)
 |-- creative_id: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- gaid: string (nullable = true)
 |-- idfa: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- ip_geo: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- city_name: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- lat: string (nullable = true)
 |    |-- lon: string (nullable = true)
 |    |-- metro_code: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- isGlobalIpFlag: long (nullable = true)
 |-- p_click_id: string (nullable = true)
 |-- publisher_id: string (nullable = true)
 |-- query: struct (nullable = true)
 |    |-- campaign_id: string (nullable = true)
 |    |-- click_id: string (nullable = true)
 |    |-- creative_id: string (nullable = true)
 |    |-- gaid: string (nullable = t