In [1]:
%pip install Faker



In [2]:
pip install findspark



In [3]:
pip install boto3




In [4]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=72af6bc14e8fd5393e58e0d240ced2b06939bd993fd7220ca8f6839c5cbc0fa1
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from faker import Faker
from pyspark.sql.functions import col, year, month, dayofmonth
import random
import re
import os
import boto3
from io import StringIO
from datetime import datetime
import pandas as pd


In [5]:
spark = SparkSession.builder.appName("RandomDataGeneration").getOrCreate()
fake = Faker()

In [6]:
def generate_random_data(col_name):
    if col_name == 'touch_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'touch_name':
        return fake.catch_phrase()
    elif col_name == 'touch_description':
        return fake.sentence(nb_words=6)
    elif col_name == 'touch_start_date':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'touch_end_date':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'touch_reply_by_date':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'offer_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'offer_name':
        return fake.catch_phrase()
    elif col_name == 'keycode_type':
        return fake.word(ext_word_list=['Type1', 'Type2', 'Type3'])
    elif col_name == 'sub_channel_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'sub_channel_name':
        return fake.catch_phrase()
    elif col_name == 'marketing_channel_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'marketing_channel_name':
        return fake.catch_phrase()
    elif col_name == 'creative_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'creative_name':
        return fake.catch_phrase()
    elif col_name == 'variant_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'variant_name':
        return fake.catch_phrase()
    elif col_name == 'variant_desc':
        return fake.sentence(nb_words=6)
    elif col_name == 'keycode_10th_position_byte':
        return fake.random_int(min=1000000000, max=9999999999)
    elif col_name == 'variant_population_split':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'cell_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'cell_name':
        return fake.catch_phrase()
    elif col_name == 'cell_description':
        return fake.sentence(nb_words=6)
    elif col_name == 'selection_group_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'selection_group_name':
        return fake.catch_phrase()
    elif col_name == 'selection_parameters_name':
        return fake.catch_phrase()
    elif col_name == 'tactic_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'tactic_name':
        return fake.catch_phrase()
    elif col_name == 'tactic_description':
        return fake.sentence(nb_words=6)
    elif col_name == 'tactic_start_date':
        return fake.date_this_decade().strftime('%Y-%m-%d')
    elif col_name == 'tactic_type_name':
        return fake.word(ext_word_list=['TypeA', 'TypeB', 'TypeC'])
    elif col_name == 'cross_sell_keycode_value':
        return fake.random_int(min=1000000000, max=9999999999)
    elif col_name == 'campaign_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'campaign_name':
        return fake.catch_phrase()
    elif col_name == 'campaign_number':
        return fake.catch_phrase()
    elif col_name == 'campaign_description':
        return fake.sentence(nb_words=6)
    elif col_name == 'campaign_start_date':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'campaign_end_date':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'finance_year':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'finance_month':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'campaign_keycode_camp_number':
        return fake.random_int(min=1000000000, max=9999999999)
    elif col_name == 'initiative_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'initiative_description':
        return fake.sentence(nb_words=6)
    elif col_name == 'initiative_start_date':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'initiative_end_date':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'marketing_program_id':
        return fake.random_int(min=1, max=10000000000000000)
    elif col_name == 'marketing_program_name':
        return fake.catch_phrase()
    elif col_name == 'marketing_program_description':
        return fake.sentence(nb_words=6)
    elif col_name == 'file_name':
        return fake.file_name(category='text')
    elif col_name == 'load_timestamp':
        return fake.date_time_this_decade().isoformat()
    elif col_name == 'campaign_number_new':
        return fake.random_int(min=1, max=10000000000000000)
    else:
        return None

In [7]:
generate_random_data_udf = udf(generate_random_data)


In [8]:
schema = StructType([
    StructField('touch_id', LongType(), False),
    StructField('touch_name', StringType(), True),
    StructField('touch_description', StringType(), True),
    StructField('touch_start_date', TimestampType(), True),
    StructField('touch_end_date', TimestampType(), True),
    StructField('touch_reply_by_date', TimestampType(), True),
    StructField('offer_id', LongType(), True),
    StructField('offer_name', StringType(), True),
    StructField('keycode_type', StringType(), True),
    StructField('sub_channel_id', LongType(), True),
    StructField('sub_channel_name', StringType(), True),
    StructField('marketing_channel_id', LongType(), True),
    StructField('marketing_channel_name', StringType(), True),
    StructField('creative_id', LongType(), True),
    StructField('creative_name', StringType(), True),
    StructField('variant_id', LongType(), True),
    StructField('variant_name', StringType(), True),
    StructField('variant_desc', StringType(), True),
    StructField('keycode_10th_position_byte', StringType(), True),
    StructField('variant_population_split', LongType(), True),
    StructField('cell_id', LongType(), True),
    StructField('cell_name', StringType(), True),
    StructField('cell_description', StringType(), True),
    StructField('selection_group_id', LongType(), True),
    StructField('selection_group_name', StringType(), True),
    StructField('selection_parameters_name', StringType(), True),
    StructField('tactic_id', LongType(), True),
    StructField('tactic_name', StringType(), True),
    StructField('tactic_description', StringType(), True),
     StructField('tactic_start_date', StringType(), True),
    StructField('tactic_type_name', StringType(), True),
    StructField('cross_sell_keycode_value', StringType(), True),
    StructField('campaign_id', LongType(), True),
    StructField('campaign_name', StringType(), True),
    StructField('campaign_number', StringType(), True),
    StructField('campaign_description', StringType(), True),
    StructField('campaign_start_date', TimestampType(), True),
    StructField('campaign_end_date', TimestampType(), True),
    StructField('finance_year', LongType(), True),
    StructField('finance_month', LongType(), True),
    StructField('campaign_keycode_camp_number', StringType(), True),
    StructField('initiative_id', LongType(), True),
    StructField('initiative_description', StringType(), True),
    StructField('initiative_start_date', TimestampType(), True),
    StructField('initiative_end_date', TimestampType(), True),
    StructField('marketing_program_id', LongType(), True),
    StructField('marketing_program_name', StringType(), True),
    StructField('marketing_program_description', StringType(), True),
    StructField('file_name', StringType(), True),
    StructField('load_timestamp', TimestampType(), True),
    StructField('campaign_number_new', LongType(), True)
])

In [9]:
df = spark.range(200).withColumn('id', lit(0).cast(IntegerType()))

# Generate random data for each column using the UDF

for col_name in schema.names:
    df = df.withColumn(col_name, generate_random_data_udf(lit(col_name)).cast(schema[col_name].dataType))

# Drop the 'id' column as it was added temporarily for generating rows
df = df.drop("id")

# Show the resulting DataFrame
df.show()

+----------------+--------------------+--------------------+-------------------+-------------------+-------------------+----------------+--------------------+------------+----------------+--------------------+--------------------+----------------------+----------------+--------------------+----------------+--------------------+--------------------+--------------------------+------------------------+----------------+--------------------+--------------------+------------------+--------------------+-------------------------+----------------+--------------------+--------------------+-----------------+----------------+------------------------+----------------+--------------------+--------------------+--------------------+-------------------+-------------------+----------------+----------------+----------------------------+----------------+----------------------+---------------------+-------------------+--------------------+----------------------+-----------------------------+---------------

In [16]:
pandas_df = df.toPandas()
pdf=pandas_df

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [17]:
pdf['tactic_start_date'] = pd.to_datetime(pdf['tactic_start_date'])


In [19]:
df_with_partition = pdf.copy()
df_with_partition['year'] = df_with_partition['tactic_start_date'].dt.year
df_with_partition['month'] = df_with_partition['tactic_start_date'].dt.month
df_with_partition['day'] = df_with_partition['tactic_start_date'].dt.day

In [22]:
import pandas as pd
import boto3
from io import BytesIO
from datetime import datetime

In [30]:
aws_access_key_id = 'AKIAWS56NT65JPA4FLGB'
aws_secret_access_key = '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'


bucket_name = 'aaa-dbt-pocs-raw-data'
def upload_to_s3_parquet(df, bucket_name, s3_prefix):
    s3 = boto3.client('s3',
                      aws_access_key_id=aws_access_key_id,
                      aws_secret_access_key=aws_secret_access_key)

    # Convert DataFrame to Parquet format and partition by 'year', 'month', and 'day'
    for partition_name, partition_df in df.groupby(['year', 'month', 'day']):
        year_folder = str(partition_name[0])
        month_folder = str(partition_name[1]).zfill(2)
        day_folder = str(partition_name[2]).zfill(2)
        file_name = f"{year_folder}-{month_folder}-{day_folder}.parquet"
        folder_path = f"{s3_prefix}/{year_folder}/{month_folder}/{day_folder}/"

        # Write DataFrame to Parquet in-memory buffer
        parquet_buffer = BytesIO()
        partition_df.drop('tactic_start_date', axis=1).to_parquet(parquet_buffer, index=False, engine='pyarrow')

        # Upload the Parquet buffer to S3
        s3.upload_fileobj(parquet_buffer, bucket_name, f"{folder_path}{file_name}")
        print(f"Uploaded {file_name} to {folder_path}")

# Replace 'https://aaa-dbt-pocs-raw-data.s3.amazonaws.com/promotion_history/' with your actual S3 prefix
s3_prefix = 'nakul/'

# Call the function to upload the DataFrame partitions to S3 in Parquet format
upload_to_s3_parquet(df_with_partition, bucket_name, s3_prefix)


Uploaded 2020-01-08.parquet to nakul//2020/01/08/
Uploaded 2020-01-16.parquet to nakul//2020/01/16/
Uploaded 2020-01-20.parquet to nakul//2020/01/20/
Uploaded 2020-02-10.parquet to nakul//2020/02/10/
Uploaded 2020-02-17.parquet to nakul//2020/02/17/
Uploaded 2020-03-14.parquet to nakul//2020/03/14/
Uploaded 2020-03-22.parquet to nakul//2020/03/22/
Uploaded 2020-04-06.parquet to nakul//2020/04/06/
Uploaded 2020-04-19.parquet to nakul//2020/04/19/
Uploaded 2020-04-22.parquet to nakul//2020/04/22/
Uploaded 2020-04-23.parquet to nakul//2020/04/23/
Uploaded 2020-05-31.parquet to nakul//2020/05/31/
Uploaded 2020-06-03.parquet to nakul//2020/06/03/
Uploaded 2020-06-29.parquet to nakul//2020/06/29/
Uploaded 2020-07-05.parquet to nakul//2020/07/05/
Uploaded 2020-07-15.parquet to nakul//2020/07/15/
Uploaded 2020-07-21.parquet to nakul//2020/07/21/
Uploaded 2020-07-29.parquet to nakul//2020/07/29/
Uploaded 2020-07-31.parquet to nakul//2020/07/31/
Uploaded 2020-08-14.parquet to nakul//2020/08/14/


In [21]:

aws_access_key_id = 'AKIAWS56NT65JPA4FLGB'
aws_secret_access_key = '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'


bucket_name = 'aaa-dbt-pocs-raw-data'



def upload_to_s3(df, bucket_name, s3_prefix):
    s3 = boto3.resource('s3',
                        aws_access_key_id=aws_access_key_id,
                        aws_secret_access_key=aws_secret_access_key)

    for _, row in df.iterrows():
        date = row['tactic_start_date']
        year_folder = date.strftime('%Y')
        month_folder = date.strftime('%m')
        day_folder = date.strftime('%d')
        file_name = f"{date.strftime('%Y-%m-%d')}.csv"
        folder_path = f"{s3_prefix}/{year_folder}/{month_folder}/{day_folder}"

        csv_buffer = StringIO()
        row.drop('tactic_start_date', inplace=True)
        row.to_csv(csv_buffer, index=False)

        s3.Object(bucket_name, f"{folder_path}/{file_name}").put(Body=csv_buffer.getvalue())

# Replace 'path/to/your/desired/s3_prefix' with your actual S3 prefix
s3_prefix = 'https://aaa-dbt-pocs-raw-data.s3.amazonaws.com/promotion_history/'

# Call the function to upload the DataFrame partitions to S3
upload_to_s3(df_with_partition, bucket_name, s3_prefix)

In [11]:
spark.conf.set("spark.hadoop.fs.s3a.access.key", "YOUR_AWS_ACCESS_KEY_ID")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "YOUR_AWS_SECRET_ACCESS_KEY")

In [12]:
df_with_partition = df.withColumn('year', year(col('tactic_start_date')))
df_with_partition = df_with_partition.withColumn('month', month(col('tactic_start_date')))
df_with_partition = df_with_partition.withColumn('day', dayofmonth(col('tactic_start_date')))

In [13]:
df_with_partition.write.partitionBy("year", "month", "day").parquet(f"s3a://{bucket_name}/promotion_history/")


Py4JJavaError: ignored