In [1]:
from dotenv import load_dotenv
import os, pandas as pd, ast, json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
import numpy as np

In [2]:
load_dotenv()

True

In [3]:
app_name = os.getenv('APP_NAME')
dataset_path = os.getenv('DATASET_PATH')

In [4]:
spark = SparkSession.builder \
    .appName(app_name) \
    .config("spark.master", "local[*]") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.hadoop.fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .getOrCreate()

In [5]:
train = spark.read.csv(os.path.join(dataset_path, "train_v2.csv"), header=True, quote="\"", escape="\"")
test = spark.read.csv(os.path.join(dataset_path, "test_v2.csv"), header=True, quote="\"", escape="\"")

In [6]:
train.show(truncate=False)

+---------------+--------------------------------------------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
test.show(truncate=False)

+---------------+------------------------------------------+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
train = train.drop(*["hits", "socialEngagementType"]) # drop hits because it's complicated (temporarily)
test = test.drop(*["hits", "socialEngagementType"]) # drop hits because it's complicated (temporarily)

In [9]:
from modules.preprocessing import *

In [10]:
train = json_to_col(train, ["customDimensions", "device", "geoNetwork", "totals", "trafficSource"])
test = json_to_col(test, ["customDimensions", "device", "geoNetwork", "totals", "trafficSource"])

ArrayType(StructType([StructField('index', StringType(), True), StructField('value', StringType(), True)]), True)
StructType([StructField('browser', StringType(), True), StructField('browserVersion', StringType(), True), StructField('browserSize', StringType(), True), StructField('operatingSystem', StringType(), True), StructField('operatingSystemVersion', StringType(), True), StructField('isMobile', BooleanType(), True), StructField('mobileDeviceBranding', StringType(), True), StructField('mobileDeviceModel', StringType(), True), StructField('mobileInputSelector', StringType(), True), StructField('mobileDeviceInfo', StringType(), True), StructField('mobileDeviceMarketingName', StringType(), True), StructField('flashVersion', StringType(), True), StructField('language', StringType(), True), StructField('screenColors', StringType(), True), StructField('screenResolution', StringType(), True), StructField('deviceCategory', StringType(), True)])
StructType([StructField('continent', StringT

In [11]:
train.show()
test.show()

+---------------+--------+-------------------+----------+-----------+--------------+----------------------+----------------------+--------------+---------------------+--------------------+----------------------+-----------------------------+---------------+---------------------------+------------------------+--------------------------+-----------------------+--------------------------------+--------------------+--------------------+--------------------+-----------------------+---------------------+--------------------+-----------------------+------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------------+-------------+-----------+----------------+--------------+----------------+------------------------+-----------------+-------------------+-------------------------+------------------------------+----------------------+--------------------+----------------

In [12]:
train = fill_na(train)
test = fill_na(test)

In [13]:
train.show()
test.show()

+---------------+----------+-------------------+----------+-----------+--------------+----------------------+----------------------+--------------+---------------------+------------------+----------------------+-----------------------------+---------------+---------------------------+------------------------+--------------------------+-----------------------+--------------------------------+-------------------+---------------+-------------------+-----------------------+---------------------+--------------------+-----------------------+------------------+-----------------+--------------------+---------------+-----------------+------------------------+-------------------+--------------------+--------------------------+-------------+-----------+----------------+--------------+----------------+------------------------+-----------------+-------------------+-------------------------+------------------------------+----------------------+--------------------+--------------------+--------------

In [14]:
test = test.select(train.columns)

In [15]:
print(train.columns)
print(test.columns)

['channelGrouping', 'date', 'fullVisitorId', 'visitId', 'visitNumber', 'visitStartTime', 'customDimensions_index', 'customDimensions_value', 'device_browser', 'device_browserVersion', 'device_browserSize', 'device_operatingSystem', 'device_operatingSystemVersion', 'device_isMobile', 'device_mobileDeviceBranding', 'device_mobileDeviceModel', 'device_mobileInputSelector', 'device_mobileDeviceInfo', 'device_mobileDeviceMarketingName', 'device_flashVersion', 'device_language', 'device_screenColors', 'device_screenResolution', 'device_deviceCategory', 'geoNetwork_continent', 'geoNetwork_subContinent', 'geoNetwork_country', 'geoNetwork_region', 'geoNetwork_metro', 'geoNetwork_city', 'geoNetwork_cityId', 'geoNetwork_networkDomain', 'geoNetwork_latitude', 'geoNetwork_longitude', 'geoNetwork_networkLocation', 'totals_visits', 'totals_hits', 'totals_pageviews', 'totals_bounces', 'totals_newVisits', 'totals_sessionQualityDim', 'totals_timeOnSite', 'totals_transactions', 'totals_transactionRevenue

In [16]:
df = train.union(test)

In [17]:
df = drop_single_value_columns(df)

['device_browserVersion', 'device_browserSize', 'device_operatingSystemVersion', 'device_mobileDeviceBranding', 'device_mobileDeviceModel', 'device_mobileInputSelector', 'device_mobileDeviceInfo', 'device_mobileDeviceMarketingName', 'device_flashVersion', 'device_language', 'device_screenColors', 'device_screenResolution', 'geoNetwork_cityId', 'geoNetwork_latitude', 'geoNetwork_longitude', 'geoNetwork_networkLocation', 'totals_visits', 'trafficSource_adwordsClickInfo_criteriaParameters']


In [18]:
df.show()

+---------------+----------+-------------------+----------+-----------+--------------+----------------------+----------------------+--------------+----------------------+---------------+---------------------+--------------------+-----------------------+------------------+-----------------+--------------------+---------------+------------------------+-----------+----------------+--------------+----------------+------------------------+-----------------+-------------------+-------------------------+------------------------------+----------------------+--------------------+--------------------+---------------------+-----------------------------------+-----------------------------------+------------------------------------+--------------------------------------------+----------------------------------------+--------------------------+--------------------------+-----------------------+
|channelGrouping|      date|      fullVisitorId|   visitId|visitNumber|visitStartTime|customDimensions_ind

In [19]:
print(len(df.columns))

40


In [20]:
df.printSchema()

root
 |-- channelGrouping: string (nullable = true)
 |-- date: date (nullable = true)
 |-- fullVisitorId: string (nullable = true)
 |-- visitId: string (nullable = true)
 |-- visitNumber: string (nullable = true)
 |-- visitStartTime: string (nullable = true)
 |-- customDimensions_index: integer (nullable = true)
 |-- customDimensions_value: string (nullable = false)
 |-- device_browser: string (nullable = true)
 |-- device_operatingSystem: string (nullable = true)
 |-- device_isMobile: boolean (nullable = true)
 |-- device_deviceCategory: string (nullable = true)
 |-- geoNetwork_continent: string (nullable = true)
 |-- geoNetwork_subContinent: string (nullable = true)
 |-- geoNetwork_country: string (nullable = true)
 |-- geoNetwork_region: string (nullable = true)
 |-- geoNetwork_metro: string (nullable = true)
 |-- geoNetwork_city: string (nullable = true)
 |-- geoNetwork_networkDomain: string (nullable = true)
 |-- totals_hits: integer (nullable = true)
 |-- totals_pageviews: intege

EXPORT

In [21]:
from pyspark.sql.functions import row_number

chunk_size = 5000 
window_spec = Window.orderBy(F.lit(1))
_df = df.withColumn("index", row_number().over(window_spec))
header = True

total_rows = _df.count()

output_file = os.path.join("data", '_data.csv')
# print(total_rows)
for i in range(1085000, total_rows, chunk_size):
    df_part = _df.filter((F.col("index") >= i) & (F.col("index") < i + chunk_size))  
    df_pandas = df_part.drop("index").toPandas()
    print(df_part.count())
    # Ghi vào tệp CSV với chế độ 'a' để append (thêm) dữ liệu
    df_pandas.to_csv(output_file, mode = "a", header=header, index=False)

    header = False

    print(f'Chunk {i} to {i + chunk_size} written to CSV.')

5000
Chunk 1085000 to 1090000 written to CSV.
5000
Chunk 1090000 to 1095000 written to CSV.
5000
Chunk 1095000 to 1100000 written to CSV.
5000
Chunk 1100000 to 1105000 written to CSV.
5000
Chunk 1105000 to 1110000 written to CSV.
5000
Chunk 1110000 to 1115000 written to CSV.
5000
Chunk 1115000 to 1120000 written to CSV.
5000
Chunk 1120000 to 1125000 written to CSV.
5000
Chunk 1125000 to 1130000 written to CSV.
5000
Chunk 1130000 to 1135000 written to CSV.
5000
Chunk 1135000 to 1140000 written to CSV.
5000
Chunk 1140000 to 1145000 written to CSV.
5000
Chunk 1145000 to 1150000 written to CSV.
5000
Chunk 1150000 to 1155000 written to CSV.
5000
Chunk 1155000 to 1160000 written to CSV.
5000
Chunk 1160000 to 1165000 written to CSV.
5000
Chunk 1165000 to 1170000 written to CSV.
5000
Chunk 1170000 to 1175000 written to CSV.
5000
Chunk 1175000 to 1180000 written to CSV.
5000
Chunk 1180000 to 1185000 written to CSV.
5000
Chunk 1185000 to 1190000 written to CSV.
5000
Chunk 1190000 to 1195000 writ