In [49]:
from google.cloud import bigquery, storage
from pyspark.sql import SparkSession

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import os
import datetime
import sys
import logging


def list_csv_files_in_gcs(bucket_name, directory_path=''):
    """
    List all CSV files in a GCS bucket under the specified directory path.
    """
    storage_client = storage.Client()
    try:
        bucket = storage_client.get_bucket(bucket_name)
        blobs = bucket.list_blobs(prefix=directory_path)
        csv_files = [f"gs://{bucket_name}/{blob.name}" for blob in blobs if blob.name.endswith('.csv')]
        return csv_files
    except Exception as e:
        logger.error(f"Error fetching files from GCS bucket: {e}")
        return []

def check_files_status_in_bigquery(file_name):
    """ 
    Check if the previous day's file exists in BigQuery table 'product_staging_table' with status='A'.
    If yes, abort the script; if no, insert today's file and proceed.
    """
    try:
        # Get previous day's date
        file_name = "sales_data_2024-05-31.csv"
        filename_today_date = file_name[11:21]  # Extract "2024-05-31" as a string
        filename_date = datetime.datetime.strptime(filename_today_date, "%Y-%m-%d").date()  # Convert string to datetime.date

        yesterday = filename_date - datetime.timedelta(days=1)
        previous_day_date = yesterday.strftime("%Y-%m-%d")
        new_file_name = file_name[:10]
        # Check if previous day's file exists with status 'A'
        statement = f"""
            SELECT file_name
            FROM `{database_name}.{product_staging_table}`
            WHERE file_name = '{file_name}_{previous_day_date}.csv' AND status='A'
        """
        logger.info(f"Dynamically created statement: {statement}")
   
        query_job = client.query(statement)
        results = query_job.result()

        if results.total_rows > 0:
            logger.error(f"Previous day's file '{previous_day_date}.csv' exists in product_staging_table with status 'A'. Aborting further processing.")
            return False
        else:
            logger.info(f"File '{file_name}_{previous_day_date}.csv' not found or does not have status 'A' in product_staging_table. Proceeding to next steps.")
            return True

    except Exception as e:
        logger.error(f"Error executing BigQuery query: {e}")
        return False

        
def validate_file_schemas(csv_files):
    """
    Validate schemas of CSV files against mandatory columns.
    """
    correct_files = []
    error_files = []

    for data in csv_files:
        try:
            # Here you would typically use Spark or pandas to validate the schema
            # This example assumes you are using Spark
            data_schema = spark.read.format('csv').option('header', 'true').load(data).columns
            logger.info(f"Schema for {data}: {data_schema}")
            logger.info(f"Mandatory columns schema: {mandatory_columns}")
            missing_columns = set(mandatory_columns) - set(data_schema)
            logger.info(f"Missing columns: {missing_columns}")

            if missing_columns:
                error_files.append(data)
                logger.info("Schema mismatch found, moving file to error directory.")
            else:
                logger.info("No schema mismatch found")
                correct_files.append(data)
        except Exception as e:
            logger.error(f"Error validating schema for file {data}: {e}")
            error_files.append(data)
            continue

    return correct_files, error_files

def insert_staging_table(filename):
    """
    Insert the file the status in BigQuery table 'product_staging_table' for correct files.
    """
    logger.info("**** Inserting today's file with status in the product_staging_table ****")

    if not correct_files:
        logger.info("** No files to process")
        return

    insert_statements = []
    current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    for file in correct_files:
        filename = os.path.basename(file)
        full_gcs_path = f'gs://{bucket_name}/{directory_path}/{filename}'
        statement = f"""
                       Insert into {database_name}.{product_staging_table} (file_name,file_location,created_date,status) 
                       values ('{filename}','{full_gcs_path}','{formatted_date}','A')"""

        insert_statements.append(statement)

    logger.info(f"** Insert statements created for staging table: {insert_statements} **")

    try:
        for statement in insert_statements:
            query_job = client.query(statement)
            query_job.result()

        logger.info("**** Staging table insertion  successfull ****")

    except Exception as e:
        logger.error(f"Error inserting in  staging table: {e}")
        raise


def update_staging_table(filename):
    """
    update the file  status in BigQuery table 'product_staging_table' for correct files once the process completes.
    """
    logger.info("**** Updating today's file with status in the product_staging_table ****")

    if not correct_files:
        logger.info("** No files to process")
        return

    update_statements = []
    current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    for file in correct_files:
        filename = os.path.basename(file)
        full_gcs_path = f'gs://{bucket_name}/{directory_path}/{filename}'
        statement = f"""
            UPDATE `{database_name}.{product_staging_table}`
            SET status = 'I', updated_date = TIMESTAMP '{formatted_date}'
            WHERE file_name = '{filename}'
        """

        update_statements.append(statement)

    logger.info(f"** Update statements created for staging table: {update_statements} **")

    try:
        for statement in update_statements:
            query_job = client.query(statement)
            query_job.result()

        logger.info("**** Staging table updation  successfull ****")

    except Exception as e:
        logger.error(f"Error updating in  staging table: {e}")
        raise

        
                
        
        
def extra_details_check(filename):
    logger.info("****  Fixing extra columns coming from source  ****")

    schema = StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("store_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("sales_date", DateType(), True),
        StructField("sales_person_id", IntegerType(), True),
        StructField("price", FloatType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("total_cost", FloatType(), True),
        StructField("additional_column", StringType(), True)
    ])


    logger.info("**** Creating empty dataframe ****")

    final_df_to_process = spark.createDataFrame([], schema)
    final_df_to_process.show()
    for data in correct_files:
        data_df=spark.read.format('csv').option('header','true').option('inferSchema','true').load(data)
        data_schema=data_df.columns
        extra_columns=list(set(data_schema)-set(mandatory_columns))
        logger.info(f'Extra columns present at source {extra_columns}')
        #If there are extra columns it is concatanating all the extra columns seperated by , and placing it in additional column
        if extra_columns:
            data_df=data_df.withColumn("additional_column",concat_ws(", ",*extra_columns))\
                .select("customer_id","store_id","product_name","sales_date","sales_person_id","price","quantity","total_cost","additional_column")
            logger.info(f"*** Processed {data}  and added additional  columns  ***")
        # If no extra columns  then  additional columns  insert None as value
        else:
            data_df=data_df.withColumn("additional_column",lit(None)) \
            .select("customer_id", "store_id", "product_name", "sales_date", "sales_person_id", "price", "quantity",
                    "total_cost", "additional_column")


    final_df_to_process=final_df_to_process.union(data_df)
    
    return final_df_to_process

    
def load_table_into_dataframe(dataset_name, table_name):
    """
    Load data from a BigQuery table into a Spark DataFrame.
    :param dataset_name: BigQuery dataset name
    :param table_name: BigQuery table name
    :return: DataFrame with the table data
    """
    full_table_name = f"{dataset_name}.{table_name}"
    logger.info(f"*** Loading {full_table_name} data into DataFrame ***")
    
    df = spark.read.format("bigquery") \
        .option("dataset", dataset_name) \
        .option("table", table_name) \
        .load()
    
    return df

#enriching the data from different table
def dimesions_table_join(final_df_to_process,
                         customer_table_df,store_table_df,sales_team_table_df):
    logger.info("Joining the final_df_to_process with customer_table_df ")
    customer_df_join = final_df_to_process.alias("s3_data") \
        .join(customer_table_df.alias("ct"),
              col("s3_data.customer_id") == col("ct.customer_id"),"inner") \
        .drop("product_name","price","quantity","additional_column",
              "s3_data.customer_id","customer_joining_date")
    
    customer_df_join.show()
    logger.info("Joining the customer_df_join with store_table_df ")
    customer_store_df_join= customer_df_join.join(store_table_df,
                             store_table_df["id"]==customer_df_join["store_id"],
                             "inner")\
                        .drop("id","store_pincode","store_opening_date","reviews")
    customer_store_df_join.show()
    logger.info("Joining the customer_store_df_join with sales_team_table_df ")
    customer_store_sales_df_join = customer_store_df_join.join(sales_team_table_df.alias("st"),
                             col("st.id")==customer_store_df_join["sales_person_id"],
                             "inner")\
                .withColumn("sales_person_first_name",col("st.first_name"))\
                .withColumn("sales_person_last_name",col("st.last_name"))\
                .withColumn("sales_person_address",col("st.address"))\
                .withColumn("sales_person_pincode",col("st.pincode"))\
                .drop("id","st.first_name","st.last_name","st.address","st.pincode")

    return customer_store_sales_df_join


def customer_mart_calculation_table_write(final_customer_data_mart_df):
    window = Window.partitionBy("customer_id","sales_date_month")
    final_customer_data_mart = final_customer_data_mart_df.withColumn("sales_date_month",
                                           substring(col("sales_date"),1,7))\
                    .withColumn("total_sales_every_month_by_each_customer",
                                sum("total_cost").over(window))\
                    .select("customer_id", concat(col("first_name"),lit(" "),col("last_name"))
                            .alias("full_name"),"address","phone_number",
                            to_date(col("sales_date_month")),
                            col("total_sales_every_month_by_each_customer").alias("total_sales"))\
                    .distinct()
    logger.info("Final customer_data_mart data to be loadings")
    final_customer_data_mart.show()
    return final_customer_data_mart


def sales_mart_calculation_table_write(final_sales_data_mart_df):
    window = Window.partitionBy("store_id","sales_person_id","sales_month")
    final_sales_data_mart = final_sales_data_mart_df.withColumn("sales_month",
                                           substring(col("sales_date"),1,7))\
                    .withColumn("total_sales_every_month",
                                sum(col("total_cost")).over(window))\
                    .select("store_id", "sales_person_id",concat(col("sales_person_first_name"),lit(" "),col("sales_person_last_name"))
                            .alias("full_name"),
                            "sales_month",
                            col("total_sales_every_month"))\
                    .distinct()

    rank_window=Window.partitionBy("store_id","sales_month").orderBy(col("total_sales_every_month"))
    final_sales_data_mart=final_sales_data_mart.withColumn("rnk",rank().over(rank_window))\
                            .withColumn("incentive", when(col("rnk")==1,col("total_sales_every_month")*0.01)\
                            .otherwise(lit(0)))\
                            .withColumn("incentive",round(col("incentive"),2))\
                             .withColumn("total_sales",col("total_sales_every_month"))\
                            .select("store_id","sales_person_id","full_name","sales_month","total_sales","incentive")#Write the Data into MySQL customers_data_mart table
    print("Final sales_data_mart data to be loadings")
    final_sales_data_mart.show()
    return final_sales_data_mart



def write_to_bigquery(dataframe, database_name, table_name,temp_gcs_bucket ,mode="overwrite"):
    """
    Write a Spark DataFrame to a BigQuery table.
    
    :param dataframe: Spark DataFrame to write
    :param project_id: Google Cloud project ID
    :param dataset_name: BigQuery dataset name
    :param table_name: BigQuery table name
    :param mode: Write mode (default is 'overwrite')
    """
    full_table_name = f"{database_name}.{table_name}"
    logger.info(f"*** Writing data to BigQuery table {full_table_name} ***")
    
    dataframe.write.format("bigquery") \
        .option("table", full_table_name) \
        .option("temporaryGcsBucket", temp_gcs_bucket) \
        .mode(mode) \
        .save()

    logger.info(f"**** Data written to BigQuery table {full_table_name} ****")



def write_to_parquet(dataframe, path, partition):
    """
    Write a Spark DataFrame to a Parquet file.
    
    :param dataframe: Spark DataFrame to write
    :param path: Path where the Parquet file will be written
    :param mode: Write mode (default is 'overwrite')
    """
    logger.info(f"*** Writing data to {path} in Parquet format ***")
    if partition==1:
        
        dataframe.write.format("parquet")\
                .option("header","true")\
                .mode("overwrite")\
                .partitionBy("sales_month","store_id")\
                .option("path",path)\
                .save()
        logger.info(f"**** Data written to local disk at path {path} ****")
    
    else:
        dataframe.write.format("parquet")\
                .option("header","true")\
                .mode("overwrite")\
                .option("path",path)\
                .save()  
        logger.info(f"**** Data written to local disk at path {path} ****")
    
    
def process_customer_data_mart(customer_store_sales_df_join, output_path,database_name, table_name,temp_gcs_bucket):
    """
    Process and write customer data mart DataFrame to Parquet.
    :param s3_customer_store_sales_df_join: DataFrame containing joined customer, store, and sales data
    """
    logger.info("*** Preparing data for customer data mart ***")
    final_customer_data_mart_df = customer_store_sales_df_join.select(
        "ct.customer_id", "ct.first_name", "ct.last_name", "ct.address", "ct.pincode", "phone_number", "sales_date", "total_cost"
    )
    
    
    logger.info("*** Calculating customer every month puchased amount ****")
    final_customer_data_mart=customer_mart_calculation_table_write(final_customer_data_mart_df)
    logger.info("*** Final data for customer_data_mart ***")
    write_to_bigquery(final_customer_data_mart_df,database_name, table_name,temp_gcs_bucket)
    write_to_parquet(final_customer_data_mart_df, output_path,0)
 

    
def process_sales_data_mart(customer_store_sales_df_join, output_path,database_name, table_name,temp_gcs_bucket):
    """
    Process and write sales data mart DataFrame to Parquet.
    """
    logger.info("*** Preparing data for sales data mart ***")
    final_sales_data_mart_df=customer_store_sales_df_join\
                            .select("store_id","sales_person_id","sales_person_first_name"
                                    ,"sales_person_last_name","store_manager_name"
                                    ,"sales_person_address","sales_person_pincode","sales_date","total_cost",
                                    expr("substring(sales_date,1,7) as sales_month"))
    
    final_sales_data_mart=sales_mart_calculation_table_write(final_sales_data_mart_df)

    write_to_bigquery(final_sales_data_mart,database_name, table_name,temp_gcs_bucket)
    write_to_parquet(final_sales_data_mart, output_path,1)
    
# Example usage:
if __name__ == "__main__":
    try:
        
        spark = SparkSession \
          .builder \
          .master('yarn') \
          .appName('spark-bigquery-demo') \
          .getOrCreate()
        # Initialize BigQuery client
        bigquery_client = bigquery.Client()

        # Initialize logger
        logging.basicConfig(level=logging.INFO)
        logger = logging.getLogger(__name__)

        # Define constants
        database_name = "retail_dataset"
        product_staging_table = "product_staging_table"
        mandatory_columns = ["customer_id", "store_id", "product_name", "sales_date", "sales_person_id", "price", "quantity", "total_cost"]

        current_date=datetime.datetime.now()
        formatted_date=current_date.strftime("%Y-%m-%d %H:%M:%S")
        # Define GCS bucket and directory path
        # Table name
        customer_table_name = "customer"
        product_staging_table = "product_staging_table"
        product_table = "product"
        sales_team_table = "sales_team"
        store_table = "store"
        customers_data_mart="customers_data_mart"
        sales_team_data_mart="sales_team_data_mart"
        
        file_to_process = 'sales_data.csv'
        bucket_name = 'ayush_landing_storage'
        directory_path = ''  # Leave empty if you want to list from the root of the bucket
        
        temp_gcs_bucket="tempgscbucket"
        processed_gcs_bucket_name='processeddataretail'
        processed_customer_data_mart_folder='Customer'
        processed_Sales_data_mart_folder='Sales'
        # List CSV files in GCS
        csv_files = list_csv_files_in_gcs(bucket_name, directory_path)
        logger.info(f"List of CSV files in GCS bucket 'gs://{bucket_name}/{directory_path}': {csv_files}")

        # Check file statuses in BigQuery
        if check_files_status_in_bigquery(file_to_process):
            # Validate file schemas
            correct_files, error_files = validate_file_schemas(csv_files)

            # Update file statuses in BigQuery
            insert_staging_table(file_to_process)

            logger.info(f"List of correct files: {correct_files}")
            logger.info(f"List of error files: {error_files}")
            
            #Extra column check
            final_df_to_process=extra_details_check(file_to_process)
            logger.info("***** Final data generated  which will be used for  further processing ****")
            final_df_to_process.show()
            #Load data from a BigQuery table into a Spark DataFrame.
            logger.info("Load data from a BigQuery table into a Spark DataFrame.")
            customer_table_df = load_table_into_dataframe( database_name, customer_table_name)
            product_table_df = load_table_into_dataframe( database_name, product_table)
            product_staging_table_df = load_table_into_dataframe( database_name, product_staging_table)
            sales_team_table_df = load_table_into_dataframe( database_name, sales_team_table)
            store_table_df = load_table_into_dataframe( database_name, store_table)
        
            
            
            
            #Final enrictched  data
            logger.info("****  Final enriched  data ****")
            customer_store_sales_df_join=dimesions_table_join(final_df_to_process,customer_table_df,store_table_df,sales_team_table_df)
            customer_store_sales_df_join.show()

            parquet_path_customer = f"gs://{processed_gcs_bucket_name}/{processed_customer_data_mart_folder}/"
            parquet_path_sales=f"gs://{processed_gcs_bucket_name}/{processed_Sales_data_mart_folder}/"
            process_customer_data_mart(customer_store_sales_df_join,parquet_path_customer,database_name,customers_data_mart,temp_gcs_bucket)
            process_sales_data_mart(customer_store_sales_df_join,parquet_path_sales,database_name,sales_team_data_mart,temp_gcs_bucket)
            today = datetime.date.today()
            
            update_staging_table(file_to_process)
            logger.info("Process completed.")
        else:
            logger.error("Aborting further processing due to previous day's file presence.")
    except Exception as e:
        logger.error(f"Error in main script execution: {e}")


INFO:__main__:List of CSV files in GCS bucket 'gs://ayush_landing_storage/': ['gs://ayush_landing_storage/sales_data_2024-05-31.csv']
INFO:__main__:Dynamically created statement: 
            SELECT file_name
            FROM `retail_dataset.product_staging_table`
            WHERE file_name = 'sales_data_2024-05-31.csv_2024-05-30.csv' AND status='A'
        
INFO:__main__:File 'sales_data_2024-05-31.csv_2024-05-30.csv' not found or does not have status 'A' in product_staging_table. Proceeding to next steps.
INFO:__main__:Schema for gs://ayush_landing_storage/sales_data_2024-05-31.csv: ['customer_id', 'store_id', 'product_name', 'sales_date', 'sales_person_id', 'price', 'quantity', 'total_cost']
INFO:__main__:Mandatory columns schema: ['customer_id', 'store_id', 'product_name', 'sales_date', 'sales_person_id', 'price', 'quantity', 'total_cost']
INFO:__main__:Missing columns: set()
INFO:__main__:No schema mismatch found
INFO:__main__:**** Inserting today's file with status in the produc

+-----------+--------+------------+----------+---------------+-----+--------+----------+-----------------+
|customer_id|store_id|product_name|sales_date|sales_person_id|price|quantity|total_cost|additional_column|
+-----------+--------+------------+----------+---------------+-----+--------+----------+-----------------+
+-----------+--------+------------+----------+---------------+-----+--------+----------+-----------------+



INFO:__main__:Extra columns present at source []                                
INFO:__main__:***** Final data generated  which will be used for  further processing ****
INFO:__main__:Load data from a BigQuery table into a Spark DataFrame.           
INFO:__main__:*** Loading retail_dataset.customer data into DataFrame ***


+-----------+--------+------------+----------+---------------+-----+--------+----------+-----------------+
|customer_id|store_id|product_name|sales_date|sales_person_id|price|quantity|total_cost|additional_column|
+-----------+--------+------------+----------+---------------+-----+--------+----------+-----------------+
|          6|     122| clinic plus|2023-07-30|              5|  1.5|       2|       3.0|             null|
|          8|     121|       maida|2023-06-20|              3| 20.0|       9|     180.0|             null|
|         14|     123|       sugar|2023-06-30|              7| 50.0|       2|     100.0|             null|
|         15|     123|   dantkanti|2023-04-22|              8|100.0|       6|     600.0|             null|
|         20|     121| quaker oats|2023-03-09|              1|212.0|       6|    1272.0|             null|
|         14|     121|       maida|2023-05-01|              1| 20.0|       7|     140.0|             null|
|         19|     122| refined oil|20

INFO:__main__:*** Loading retail_dataset.product data into DataFrame ***
INFO:__main__:*** Loading retail_dataset.product_staging_table data into DataFrame ***
INFO:__main__:*** Loading retail_dataset.sales_team data into DataFrame ***
INFO:__main__:*** Loading retail_dataset.store data into DataFrame ***
INFO:__main__:****  Final enriched  data ****
INFO:__main__:Joining the final_df_to_process with customer_table_df 
INFO:__main__:Joining the customer_df_join with store_table_df                  


+-----------+--------+----------+---------------+----------+-----------+----------+---------+-------+-------+------------+
|customer_id|store_id|sales_date|sales_person_id|total_cost|customer_id|first_name|last_name|address|pincode|phone_number|
+-----------+--------+----------+---------------+----------+-----------+----------+---------+-------+-------+------------+
|          6|     122|2023-07-30|              5|       3.0|          6|     Romil|  Shanker|  Delhi| 122009|  9129451313|
|          8|     121|2023-06-20|              3|     180.0|          8|     Divij|    Garde|  Delhi| 122009|  9141984713|
|         14|     123|2023-06-30|              7|     100.0|         14|    Yuvaan|     Bawa|  Delhi| 122009|  9162077019|
|         15|     123|2023-04-22|              8|     600.0|         15|     Sahil|Sabharwal|  Delhi| 122009|  9174928780|
|         20|     121|2023-03-09|              1|    1272.0|         20|     Kavya|   Sachar|  Delhi| 122009|  9157628717|
|         14|   

INFO:__main__:Joining the customer_store_df_join with sales_team_table_df       


+-----------+--------+----------+---------------+----------+-----------+----------+---------+-------+-------+------------+-------+------------------+
|customer_id|store_id|sales_date|sales_person_id|total_cost|customer_id|first_name|last_name|address|pincode|phone_number|address|store_manager_name|
+-----------+--------+----------+---------------+----------+-----------+----------+---------+-------+-------+------------+-------+------------------+
|          6|     122|2023-07-30|              5|       3.0|          6|     Romil|  Shanker|  Delhi| 122009|  9129451313|  Delhi|            Nikita|
|          8|     121|2023-06-20|              3|     180.0|          8|     Divij|    Garde|  Delhi| 122009|  9141984713|  Delhi|            Manish|
|         14|     123|2023-06-30|              7|     100.0|         14|    Yuvaan|     Bawa|  Delhi| 122009|  9162077019|  Delhi|            vikash|
|         15|     123|2023-04-22|              8|     600.0|         15|     Sahil|Sabharwal|  Delhi

INFO:__main__:*** Preparing data for customer data mart ***                     
INFO:__main__:*** Calculating customer every month puchased amount ****
INFO:__main__:Final customer_data_mart data to be loadings


+-----------+--------+----------+---------------+----------+-----------+----------+---------+-------+-------+------------+-------+------------------+----------+---------+----------+----------+-------+-------+------------+-----------------------+----------------------+--------------------+--------------------+
|customer_id|store_id|sales_date|sales_person_id|total_cost|customer_id|first_name|last_name|address|pincode|phone_number|address|store_manager_name|first_name|last_name|manager_id|is_manager|address|pincode|joining_date|sales_person_first_name|sales_person_last_name|sales_person_address|sales_person_pincode|
+-----------+--------+----------+---------------+----------+-----------+----------+---------+-------+-------+------------+-------+------------------+----------+---------+----------+----------+-------+-------+------------+-----------------------+----------------------+--------------------+--------------------+
|          6|     122|2023-07-30|              5|       3.0|       

INFO:__main__:*** Final data for customer_data_mart ***                         
INFO:__main__:*** Writing data to BigQuery table retail_dataset.customers_data_mart ***


+-----------+---------------+-------+------------+-------------------------+-----------+
|customer_id|      full_name|address|phone_number|to_date(sales_date_month)|total_sales|
+-----------+---------------+-------+------------+-------------------------+-----------+
|         10| Saanvi Krishna|  Delhi|  9173121081|               2023-07-01|  1820901.0|
|         19| Indranil Dutta|  Delhi|  9120667755|               2023-03-01|  1708639.0|
|         13|   Rhea Chander|  Delhi|  9103434731|               2023-08-01|  1201057.0|
|         14|    Yuvaan Bawa|  Delhi|  9162077019|               2023-08-01|  1195927.0|
|         19| Indranil Dutta|  Delhi|  9120667755|               2023-04-01|  1761097.5|
|          3|   Vidur Mammen|  Delhi|  9119017511|               2023-06-01|  1777512.0|
|         20|   Kavya Sachar|  Delhi|  9157628717|               2023-03-01|  1741902.5|
|         19| Indranil Dutta|  Delhi|  9120667755|               2023-07-01|  1800073.0|
|         20|   Kavya

INFO:__main__:**** Data written to BigQuery table retail_dataset.customers_data_mart ****
INFO:__main__:*** Writing data to gs://processeddataretail/Customer/ in Parquet format ***
INFO:__main__:**** Data written to local disk at path gs://processeddataretail/Customer/ ****
INFO:__main__:*** Preparing data for sales data mart ***


Final sales_data_mart data to be loadings


INFO:__main__:*** Writing data to BigQuery table retail_dataset.sales_team_data_mart ***


+--------+---------------+--------------+-----------+-----------+---------+
|store_id|sales_person_id|     full_name|sales_month|total_sales|incentive|
+--------+---------------+--------------+-----------+-----------+---------+
|     121|              1|   Rahul Verma|    2023-03|  3799235.0| 37992.35|
|     121|              2|   Priya Singh|    2023-03|  3886106.5|      0.0|
|     121|              3|   Amit Sharma|    2023-03|  3900652.0|      0.0|
|     121|              1|   Rahul Verma|    2023-04|  3785396.5| 37853.97|
|     121|              3|   Amit Sharma|    2023-04|  3920749.0|      0.0|
|     121|              2|   Priya Singh|    2023-04|  3927510.5|      0.0|
|     121|              2|   Priya Singh|    2023-05|  4027923.5| 40279.24|
|     121|              1|   Rahul Verma|    2023-05|  4118558.0|      0.0|
|     121|              3|   Amit Sharma|    2023-05|  4124791.0|      0.0|
|     122|              4|   Sneha Gupta|    2023-07|  3983610.5| 39836.11|
|     122|  

INFO:__main__:**** Data written to BigQuery table retail_dataset.sales_team_data_mart ****
INFO:__main__:*** Writing data to gs://processeddataretail/Sales/ in Parquet format ***
INFO:__main__:**** Data written to local disk at path gs://processeddataretail/Sales/ ****
INFO:__main__:**** Updating today's file with status in the product_staging_table ****
INFO:__main__:** Update statements created for staging table: ["\n            UPDATE `retail_dataset.product_staging_table`\n            SET status = 'I', updated_date = TIMESTAMP '2024-06-01 10:04:29'\n            WHERE file_name = 'sales_data_2024-05-31.csv'\n        "] **
INFO:__main__:**** Staging table updation  successfull ****
INFO:__main__:Process completed.


'2024-05-31'

sales_data_2024-05-30
