In [23]:
from pyspark.sql import SparkSession

In [24]:
spark = SparkSession.builder.appName("DataFrame - Operations").getOrCreate()

In [25]:
file_path = "data/SampleData/stocks.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [26]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



In [27]:
selected_cols = df.select("name", "category", "price")
selected_cols.show(5)

+----------+-----------+-------+
|      name|   category|  price|
+----------+-----------+-------+
|    iPhone|Electronics| 899.99|
|   Macbook|Electronics|1299.99|
|      iPad|Electronics| 499.99|
|Samsung TV|Electronics| 799.99|
|     LG TV|Electronics| 699.99|
+----------+-----------+-------+
only showing top 5 rows



In [28]:
other_df = df.select('id', 'category').limit(6)
joined_data = df.join(other_df, 'id', 'inner')
joined_data.show()

+---+----------+-----------+--------+-------+-----------+
| id|      name|   category|quantity|  price|   category|
+---+----------+-----------+--------+-------+-----------+
|  1|    iPhone|Electronics|      10| 899.99|Electronics|
|  2|   Macbook|Electronics|       5|1299.99|Electronics|
|  3|      iPad|Electronics|      15| 499.99|Electronics|
|  4|Samsung TV|Electronics|       8| 799.99|Electronics|
|  5|     LG TV|Electronics|      10| 699.99|Electronics|
|  6|Nike Shoes|   Clothing|      30|  99.99|   Clothing|
+---+----------+-----------+--------+-------+-----------+



In [29]:
# Sort by column
sorted_data = df.orderBy('price')
sorted_data.show(5)

+---+--------------+--------+--------+-----+
| id|          name|category|quantity|price|
+---+--------------+--------+--------+-----+
| 13|       Bananas|    Food|     150| 0.25|
| 12|        Apples|    Food|     100|  0.5|
| 14|       Oranges|    Food|     120| 0.75|
| 15|Chicken Breast|    Food|      50| 3.99|
| 16| Salmon Fillet|    Food|      30| 5.99|
+---+--------------+--------+--------+-----+
only showing top 5 rows



In [30]:
# Sort by multi columns
from pyspark.sql.functions import col, desc
sorted_data = df.orderBy(col('quantity').desc(), col('id').desc())
sorted_data.show(5)

+---+--------------+--------+--------+-----+
| id|          name|category|quantity|price|
+---+--------------+--------+--------+-----+
| 13|       Bananas|    Food|     150| 0.25|
| 14|       Oranges|    Food|     120| 0.75|
| 12|        Apples|    Food|     100|  0.5|
| 29|       T-shirt|Clothing|      50|14.99|
| 15|Chicken Breast|    Food|      50| 3.99|
+---+--------------+--------+--------+-----+
only showing top 5 rows



In [31]:
distinct_rows = df.select('category').distinct()
distinct_rows.show()

+-----------+
|   category|
+-----------+
|       Food|
|     Sports|
|Electronics|
|   Clothing|
|  Furniture|
|Accessories|
+-----------+



In [32]:
dropped_cols = df.drop('quantity', 'category')
dropped_cols.show(5)

+---+----------+-------+
| id|      name|  price|
+---+----------+-------+
|  1|    iPhone| 899.99|
|  2|   Macbook|1299.99|
|  3|      iPad| 499.99|
|  4|Samsung TV| 799.99|
|  5|     LG TV| 699.99|
+---+----------+-------+
only showing top 5 rows



In [33]:
df_new_cols = df.withColumn("revenue", df.quantity * df.price)
df_new_cols.show(5)

+---+----------+-----------+--------+-------+-------+
| id|      name|   category|quantity|  price|revenue|
+---+----------+-----------+--------+-------+-------+
|  1|    iPhone|Electronics|      10| 899.99| 8999.9|
|  2|   Macbook|Electronics|       5|1299.99|6499.95|
|  3|      iPad|Electronics|      15| 499.99|7499.85|
|  4|Samsung TV|Electronics|       8| 799.99|6399.92|
|  5|     LG TV|Electronics|      10| 699.99| 6999.9|
+---+----------+-----------+--------+-------+-------+
only showing top 5 rows



In [35]:
df_with_alias = df.withColumnsRenamed({"quantity": "p_quantity", "price": "p_price"})
df_with_alias

DataFrame[id: int, name: string, category: string, p_quantity: int, p_price: double]

## Đo hiệu năng

In [36]:
import pandas as pd
import time

file_path = "data/SampleData/Online_Retail.csv"

# C1: Đo hiệu năng với Pandas
start_time_pandas = time.time()
try:
    pandas_df = pd.read_csv(file_path)
    pandas_process_time = time.time() - start_time_pandas
except Exception as e:
    pandas_process_time = None
    pandas_error = str(e)

# C2: Đo hiệu năng với Spark
start_time_spark = time.time()
try:
    spark_df = spark.read.csv(file_path, header=True, inferSchema=True)
    spark_process_time = time.time() - start_time_spark
except Exception as e:
    spark_process_time = None
    spark_error = str(e)

In [37]:
results = {
    "Pandas Read Time (s)": pandas_process_time if pandas_process_time else pandas_error,
    "Spark Read Time (s)": spark_process_time if spark_process_time else spark_error
}
results 

{'Pandas Read Time (s)': "'utf-8' codec can't decode byte 0xa3 in position 227179: invalid start byte",
 'Spark Read Time (s)': 0.34600186347961426}