In [174]:
#library import
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark=SparkSession.builder.master("local").appName("csv_to_parquet_landing").getOrCreate()

In [189]:
#script execution for raw data to landing
class CSV_to_parquet:
    
    """Script to read raw file from local path to landing.."""
    
    def __init__(self,file_parameter):
        
        """function to get input parameters to read the file from local path"""
        self.file_parameter=file_parameter
        
        """read raw data from local"""
    def read_csv(self,fileformat,file_path):
        return spark.read.format(fileformat).option("header","True").load(self.file_parameter['src_base_read_path']+file_path)
        
        """save file as parquet and also save as table in local db"""
    def write_parquet(self,df,dest_path,table_name):  
        df.write.mode("overwrite").parquet(dest_path)
        df.write.format(self.file_parameter['parquet']).mode("overwrite").option("path",dest_path)\
        .saveAsTable("school_landing.{0}".format(table_name))
        spark.read.parquet(dest_path).show()

        #spark.sql("select * FROM school_landing.{0}".format(table_name)).show(df.count(),truncate=False)
        
        """creating temporary view to add audit columns- created timestamp,creator name,updator name"""
    def create_view(self,df,schemas):
        
        df1 = spark.createDataFrame(data=df.rdd,schema=schemas)
        #df1.show()
        df1.createOrReplaceTempView(self.file_parameter['temp_table'])
        temp_df = spark.sql("select * from {0}".format(self.file_parameter['temp_table']))
        df__audit_user=temp_df.withColumn(self.file_parameter['audit_created_col_name'],lit(self.file_parameter['audit_created_username'])) \
        .withColumn(self.file_parameter['audit_created_timestamp'],current_timestamp()) \
        .withColumn(self.file_parameter['audit_updated_col_name'],lit(self.file_parameter['audit_updated_username'])) \
        .withColumn(self.file_parameter['audit_updated_timestamp'],current_timestamp())
        #df__audit_user.show(df__audit_user.count(),truncate=False)
        return df__audit_user
    

    
    def main_block(self):
        
        marks_schema = StructType([ \
            StructField("exam_date",StringType(),True), \
            StructField("marks",StringType(),True), \
            StructField("student_id",StringType(),True), \
            StructField("subject_id",StringType(),True), \
            ])
        students_schema = StructType([ \
            StructField("student_name",StringType(),True), \
            StructField("student_id",StringType(),True), \
          
            ])
        subjects_schema = StructType([ \
             StructField("subject_name",StringType(),True), \
            StructField("subject_id",StringType(),True), \
            ])        
        """main execution block"""
        spark.sql("create database if not exists {0}".format(self.file_parameter['local_db_name']))
        df1=self.read_csv(self.file_parameter['csv'],self.file_parameter['read_filename_marks']);
        df1.show()
        parquet_df1 = self.create_view(df1,marks_schema);
        self.write_parquet(parquet_df1,self.file_parameter['dest_base_write_path']+self.file_parameter['write_filename_marks'],self.file_parameter['table_name_marks']);
        
        df2=self.read_csv(self.file_parameter['csv'],self.file_parameter['read_filename_student']);
        parquet_df2 = self.create_view(df2,students_schema);
        self.write_parquet(parquet_df2, self.file_parameter['dest_base_write_path']+self.file_parameter['write_filename_student'],self.file_parameter['table_name_students']);
        
        df3=self.read_csv(self.file_parameter['csv'],self.file_parameter['read_filename_subject'],);
        parquet_df3 = self.create_view(df3,subjects_schema);
        self.write_parquet(parquet_df3,self.file_parameter['dest_base_write_path']+self.file_parameter['write_filename_subject'],self.file_parameter['table_name_subject']);
        
        
        

In [191]:
if __name__=='__main__':
    
    file_read={
        "csv_format":".csv",
        "csv":"csv",
        "parquet_format":".parquet",
        "parquet":"parquet",
        "table_name_marks":"marks",
        "table_name_students":"students",
        "table_name_subject":"subjects",
        "read_filename_marks":"/marks/markdetails.csv",
        "read_filename_student":"/student/studentdetails.csv",
        "read_filename_subject":"/subject/subjectdetails.csv",
        "write_filename_marks":"/marks/markdetails.parquet",
        "write_filename_student":"/student/studentdetails.parquet",
        "write_filename_subject":"/subject/subjectdetails.parquet",
        "src_base_read_path":"../../../../git projects/big_data_project_git/data_files/pre_landing/raw_files/school",
        "dest_base_write_path":"../../../../git projects/big_data_project_git/data_files/landing/school_landing",
        "src_path":"../../../../git projects/big_data_project_git/data_files/pre_landing/raw_files/school/marks/markdetails.csv",
        "dest_path":"../../../../git projects/big_data_project_git/data_files/landing/school_landing/marks/markdetails.parquet",
        "temp_table":"school_table",
        "local_db_name":"school_landing",
        "audit_created_username":"vasanth",
        "audit_created_col_name":"audit_created_username",
        "audit_created_timestamp":"audit_created_timestamp",
        "audit_updated_username":"vasanth",
        "audit_updated_col_name":"audit_updated_username",
        "audit_updated_timestamp":"audit_updated_timestamp",

    }
   
    

    landing_obj1=CSV_to_parquet(file_read)
    landing_obj1.main_block()
   

+----------+---+----+----+
|11-26-2021| 79|1682|7001|
+----------+---+----+----+
|11-26-2021| 50|1682|7002|
|11-26-2021| 79|1682|7003|
|11-26-2021| 55|1682|7004|
|12-26-2021| 71|1682|7001|
|12-26-2021| 81|1682|7002|
|12-26-2021| 92|1682|7003|
|12-26-2021| 64|1682|7004|
|01-26-2022| 64|1682|7001|
|01-26-2022| 83|1682|7002|
|01-26-2022| 64|1682|7003|
|01-26-2022| 94|1682|7004|
|11-26-2021| 84|2180|7001|
|11-26-2021| 72|2180|7002|
|11-26-2021| 89|2180|7003|
|11-26-2021| 92|2180|7004|
|12-26-2021| 95|2180|7001|
|12-26-2021| 49|2180|7002|
|12-26-2021| 93|2180|7003|
|12-26-2021| 55|2180|7004|
|01-26-2022| 50|2180|7001|
+----------+---+----+----+
only showing top 20 rows



22/02/07 18:40:44 WARN HadoopFSUtils: The directory file:/Users/vasanth_ku/Vasanth/spark-apache/git%20projects/big_data_project_git/git projects/big_data_project_git/data_files/landing/school_landing/marks/markdetails.parquet was not found. Was it deleted very recently?


+----------+-----+----------+----------+----------------------+-----------------------+----------------------+-----------------------+
| exam_date|marks|student_id|subject_id|audit_created_username|audit_created_timestamp|audit_updated_username|audit_updated_timestamp|
+----------+-----+----------+----------+----------------------+-----------------------+----------------------+-----------------------+
|11-26-2021|   50|      1682|      7002|               vasanth|   2022-02-07 18:40:...|               vasanth|   2022-02-07 18:40:...|
|11-26-2021|   79|      1682|      7003|               vasanth|   2022-02-07 18:40:...|               vasanth|   2022-02-07 18:40:...|
|11-26-2021|   55|      1682|      7004|               vasanth|   2022-02-07 18:40:...|               vasanth|   2022-02-07 18:40:...|
|12-26-2021|   71|      1682|      7001|               vasanth|   2022-02-07 18:40:...|               vasanth|   2022-02-07 18:40:...|
|12-26-2021|   81|      1682|      7002|               