### Zadanie 1

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, month, upper, lit
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

spark = SparkSession.builder.appName("Zamowienia").getOrCreate()

file_path = "zamowienia.txt"
df = spark.read.option("header", True).option("delimiter", ";").csv(file_path)
df_cleaned = df \
    .withColumn("Sprzedawca", regexp_replace(col("Sprzedawca"), "ä", "a")) \
    .withColumn("Utarg", regexp_replace(col("Utarg"), "[zł, ]", "").cast("float")) \
    .withColumn("idZamowienia", col("idZamowienia").cast("int"))

### Zadanie 2

In [21]:
df_bucketed = df_cleaned.repartition(10, "Kraj")
df_bucketed.createOrReplaceTempView("bucketed_data")
query_bucketed = "SELECT Kraj, SUM(Utarg) as TotalRevenue FROM bucketed_data GROUP BY Kraj"
spark.sql(query_bucketed).show()

+------+------------+
|  Kraj|TotalRevenue|
+------+------------+
|Niemcy| 8.9499649E7|
|Polska| 3.3333091E7|
+------+------------+



In [22]:
output_path_country = "output/partitioned_by_country"
df_cleaned.write.partitionBy("Kraj").csv(output_path_country, mode="overwrite")

output_path_seller = "output/partitioned_by_seller"
df_cleaned.write.partitionBy("Sprzedawca").csv(output_path_seller, mode="overwrite")

In [24]:
import time


start_time_original = time.time()
query_original = "SELECT Kraj, SUM(Utarg) as TotalRevenue FROM original_data WHERE Kraj = 'Polska' GROUP BY Kraj"
result_original = spark.sql(query_original).collect()
end_time_original = time.time()

start_time_partitioned = time.time()
query_partitioned = "SELECT Kraj, SUM(Utarg) as TotalRevenue FROM partitioned_data WHERE Kraj = 'Polska' GROUP BY Kraj"
result_partitioned = spark.sql(query_partitioned).collect()
end_time_partitioned = time.time()

print("Wyniki na danych oryginalnych:", result_original)
print("Czas wykonania na danych oryginalnych:", end_time_original - start_time_original, "sekund")

print("Wyniki na danych partycjonowanych:", result_partitioned)
print("Czas wykonania na danych partycjonowanych:", end_time_partitioned - start_time_partitioned, "sekund")


Wyniki na danych oryginalnych: [Row(Kraj='Polska', TotalRevenue=33333091.0)]
Czas wykonania na danych oryginalnych: 0.8237721920013428 sekund
Wyniki na danych partycjonowanych: [Row(Kraj='Polska', TotalRevenue=33333091.0)]
Czas wykonania na danych partycjonowanych: 0.5613882541656494 sekund


### Zadanie 3

In [32]:
from pyspark.sql.functions import month, upper, lit
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

subset1 = df_cleaned.withColumn("month", month("Data zamowienia"))
subset1.createOrReplaceTempView("subset1")

subset2 = df_cleaned.withColumn("Netto", col("Utarg") / 1.23)
subset2.write.parquet("output/subset2.parquet", mode="overwrite")

schema_subset3 = StructType([
    StructField("Kraj", StringType(), True),
    StructField("Sprzedawca", StringType(), True),
    StructField("Data zamowienia", StringType(), True),
    StructField("idZamowienia", IntegerType(), True),
    StructField("Utarg", FloatType(), True)
])
subset3 = df_cleaned.withColumn("Sprzedawca", upper(col("Sprzedawca")))
subset3.write.csv("output/subset3.csv", mode="overwrite", header=False)

subset4 = df_cleaned.withColumn("waluta", lit("PLN"))
subset4.write.json("output/subset4.json", mode="overwrite")


+---------------+------+----------+--------+------------+------+
|Data zamowienia|  Kraj|Sprzedawca|   Utarg|idZamowienia|waluta|
+---------------+------+----------+--------+------------+------+
|     16.07.2003|Polska|  Kowalski| 44000.0|       10248|   PLN|
|     10.07.2003|Polska|  Sowiaski|186340.0|       10249|   PLN|
|     12.07.2003|Niemcy|   Peacock|155260.0|       10250|   PLN|
|     15.07.2003|Niemcy| Leverling| 65406.0|       10251|   PLN|
|     11.07.2003|Niemcy|   Peacock|359790.0|       10252|   PLN|
|     16.07.2003|Niemcy| Leverling|144480.0|       10253|   PLN|
|     23.07.2003|Polska|  Kowalski| 55662.0|       10254|   PLN|
|     15.07.2003|Polska|     Dudek|249050.0|       10255|   PLN|
|     17.07.2003|Niemcy| Leverling| 51780.0|       10256|   PLN|
|     22.07.2003|Niemcy|   Peacock|111990.0|       10257|   PLN|
|     23.07.2003|Niemcy|   Davolio|161488.0|       10258|   PLN|
|     25.07.2003|Niemcy|   Peacock| 10080.0|       10259|   PLN|
|     29.07.2003|Niemcy| 