In [None]:
from faker import Faker 
import pandas as pd 
from tqdm import tqdm

fake = Faker()
num_records = 3*10**5
data = []
for _ in tqdm(range(num_records)):
    record = {
        'user_id': fake.uuid4(),
        'username': fake.name(),
        'email': fake.email(),
        'transaction_amount': fake.random_int(min=10, max=1000),
        'transaction_date': fake.date_between(start_date='-1y', end_date='today'), 
        'product_quantity': fake.random_int(min=1, max=10)
    }
    data.append(record)

df = pd.DataFrame(data)
df.to_csv('synthetic_big_data.csv', index=False)

In [5]:
import pandas as pd 
from pyspark.sql import SparkSession 
from pyspark.sql import functions as F 
import time 
from IPython.display import display, Markdown 

In [7]:
GCS_CSV_PATH = "gs://dummy-faker-data/synthetic_big_data.csv"
GCS_OUTPUT_PATH = "gs://dummy-faker-data/avg_transaction_value_spark"
COLUMN_AMOUNT = 'transaction_amount' 
COLUMN_QUANTITY = "product_quantity" 

spark = (
    SparkSession.builder
    .appName("SparkVsPandasGCSUpdate") 
    .getOrCreate()
)
print("spark session initialized") 
timing_results = {}

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/03 14:35:53 INFO SparkEnv: Registering MapOutputTracker
26/02/03 14:35:53 INFO SparkEnv: Registering BlockManagerMaster
26/02/03 14:35:53 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
26/02/03 14:35:53 INFO SparkEnv: Registering OutputCommitCoordinator


spark session initialized


In [13]:
start_time_spark = time.time()

# creating spark df 
spark_df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true") 
    .csv(GCS_CSV_PATH)
)
spark_df_count = spark_df.count() # trigger an action to force the distributed read and measure true loading time 
time_taken_spark_read = time.time() - start_time_spark 
timing_results['spark_read'] = time_taken_spark_read 
print(time_taken_spark_read)

                                                                                

3.6184587478637695


In [12]:
start_time_spark_trans = time.time()

filtered_spark_df = spark_df.filter(spark_df[COLUMN_AMOUNT] > 500)
spark_with_total_value = filtered_spark_df.withColumn(
    "total_value", 
    F.col(COLUMN_AMOUNT) * F.col(COLUMN_QUANTITY)
)

spark_result = spark_with_total_value.groupBy('username').agg(
    F.mean('total_value').alias('avg_total_value')
)
spark_result.show(5, truncate = False)

(spark_result
    .write
    .mode('overwrite')
    .csv(GCS_OUTPUT_PATH)
)   

time_taken_spark_trans = time.time() - start_time_spark_trans 
timing_results['spark_trans'] = time_taken_spark_trans 
print(time_taken_spark_trans)

                                                                                

+-----------------+---------------+
|username         |avg_total_value|
+-----------------+---------------+
|Elizabeth Terrell|4100.0         |
|Matthew Swanson  |7656.0         |
|Heather Diaz     |3439.0         |
|Leslie Lee       |3478.0         |
|Blake Moore      |5967.0         |
+-----------------+---------------+
only showing top 5 rows



                                                                                

5.503426790237427


In [14]:
start_time_pandas = time.time()

# creating pandas df 
pandas_df = pd.read_csv(GCS_CSV_PATH)

time_taken_pandas_read = time.time() - start_time_pandas 
timing_results['pandas_read'] = time_taken_pandas_read 
print(time_taken_pandas_read)

1.1676914691925049


In [16]:
start_time_pandas = time.time()

# creating pandas df 
filtered_pandas_df = pandas_df[pandas_df[COLUMN_AMOUNT] > 500] 
filtered_pandas_df['total_value'] = filtered_pandas_df[COLUMN_AMOUNT] * filtered_pandas_df[COLUMN_QUANTITY] 
pandas_result = filtered_pandas_df.groupby('username')['total_value'].mean().reset_index()

time_taken_pandas_trans = time.time() - start_time_pandas 
timing_results['pandas_read'] = time_taken_pandas_trans 
print(time_taken_pandas_trans)

0.11241507530212402


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_pandas_df['total_value'] = filtered_pandas_df[COLUMN_AMOUNT] * filtered_pandas_df[COLUMN_QUANTITY]
