In [1]:
import findspark
import time

In [2]:
findspark.init()

In [3]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col, lit, round
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

In [4]:
spark = SparkSession\
        .builder\
        .master("local[2]")\
        .appName("Apache SQL and Hive")\
        .config("spark.memory.offHeap.enabled","true")\
        .config("spark.memory.offHeap.size","4g")\
        .enableHiveSupport()\
        .getOrCreate()
spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/21 16:24:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.csv('zamowienia.txt', header=True, inferSchema=True, sep=';')

In [6]:
df.createOrReplaceTempView("zamowienia")

In [7]:
spark.catalog.listTables()

25/01/21 16:24:53 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/01/21 16:24:53 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/01/21 16:25:04 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/01/21 16:25:04 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.18.0.2
25/01/21 16:25:04 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


[Table(name='zamowienia_bucketed', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='zamowienia', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [8]:
df.show()

+------+----------+---------------+------------+-----------+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia|      Utarg|
+------+----------+---------------+------------+-----------+
|Polska|  Kowalski|     16.07.2003|       10248|  440,00 z|
|Polska|  Sowiäski|     10.07.2003|       10249|1 863,40 z|
|Niemcy|   Peacock|     12.07.2003|       10250|1 552,60 z|
|Niemcy| Leverling|     15.07.2003|       10251|  654,06 z|
|Niemcy|   Peacock|     11.07.2003|       10252|3 597,90 z|
|Niemcy| Leverling|     16.07.2003|       10253|1 444,80 z|
|Polska|  Kowalski|     23.07.2003|       10254|  556,62 z|
|Polska|     Dudek|     15.07.2003|       10255|2 490,50 z|
|Niemcy| Leverling|     17.07.2003|       10256|  517,80 z|
|Niemcy|   Peacock|     22.07.2003|       10257|1 119,90 z|
|Niemcy|   Davolio|     23.07.2003|       10258|1 614,88 z|
|Niemcy|   Peacock|     25.07.2003|       10259|  100,80 z|
|Niemcy|   Peacock|     29.07.2003|       10260|1 504,65 z|
|Niemcy|   Peacock|     

In [9]:
spark.sql('select Kraj from zamowienia').show()

+------+
|  Kraj|
+------+
|Polska|
|Polska|
|Niemcy|
|Niemcy|
|Niemcy|
|Niemcy|
|Polska|
|Polska|
|Niemcy|
|Niemcy|
|Niemcy|
|Niemcy|
|Niemcy|
|Niemcy|
|Niemcy|
|Polska|
|Polska|
|Niemcy|
|Niemcy|
|Niemcy|
+------+
only showing top 20 rows



In [10]:
df = df.withColumn("ID", monotonically_increasing_id())
df.show(10)

+------+----------+---------------+------------+-----------+---+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia|      Utarg| ID|
+------+----------+---------------+------------+-----------+---+
|Polska|  Kowalski|     16.07.2003|       10248|  440,00 z|  0|
|Polska|  Sowiäski|     10.07.2003|       10249|1 863,40 z|  1|
|Niemcy|   Peacock|     12.07.2003|       10250|1 552,60 z|  2|
|Niemcy| Leverling|     15.07.2003|       10251|  654,06 z|  3|
|Niemcy|   Peacock|     11.07.2003|       10252|3 597,90 z|  4|
|Niemcy| Leverling|     16.07.2003|       10253|1 444,80 z|  5|
|Polska|  Kowalski|     23.07.2003|       10254|  556,62 z|  6|
|Polska|     Dudek|     15.07.2003|       10255|2 490,50 z|  7|
|Niemcy| Leverling|     17.07.2003|       10256|  517,80 z|  8|
|Niemcy|   Peacock|     22.07.2003|       10257|1 119,90 z|  9|
+------+----------+---------------+------------+-----------+---+
only showing top 10 rows



In [11]:
df.write.bucketBy(16, 'ID').mode('overwrite').sortBy('Sprzedawca').saveAsTable('zamowienia_bucketed')

25/01/21 16:25:09 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/01/21 16:25:09 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/01/21 16:25:09 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/01/21 16:25:09 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


In [12]:

!ls spark-warehouse/zamowienia_bucketed/*.parquet

spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00000.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00001.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00002.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00003.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00004.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00005.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00006.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00007.c000.snappy.parquet
spark-warehouse/zamowienia_bucketed/part-00000-ea843406-c83b-49f7-a732-f3f873b86478_00008.c000.snappy.parquet
spark-ware

In [13]:
spark.table('zamowienia_bucketed').show(10)

+------+----------+---------------+------------+-----------+---+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia|      Utarg| ID|
+------+----------+---------------+------------+-----------+---+
|Niemcy|  Callahan|     10.01.2004|       10402|2 713,50 z|154|
|Niemcy|  Callahan|     26.02.2004|       10452|2 018,50 z|204|
|Niemcy|  Callahan|     01.08.2004|       10614|  464,00 z|366|
|Niemcy|  Callahan|     12.08.2004|       10623|1 336,95 z|375|
|Niemcy|  Callahan|     11.09.2004|       10651|  397,80 z|403|
|Niemcy|  Callahan|     20.03.2005|       10955|   74,40 z|707|
|Niemcy|  Callahan|     31.03.2005|       10979|4 813,50 z|731|
|Niemcy|  Callahan|     13.04.2005|       10997|1 885,00 z|749|
|Niemcy|   Davolio|     13.12.2003|       10377|  863,60 z|129|
|Niemcy|   Davolio|     11.09.2004|       10655|  154,40 z|407|
+------+----------+---------------+------------+-----------+---+
only showing top 10 rows



In [14]:
spark.catalog.listTables()

[Table(name='zamowienia_bucketed', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='zamowienia', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [15]:
dfs=df

In [16]:
dfs = dfs.withColumn('Utarg', F.regexp_replace('Utarg', ',', '.'))

In [17]:
dfs = dfs.withColumn('Utarg', F.regexp_replace('Utarg', '[^0-9.,]', ''))

In [18]:
df = dfs.withColumn('Utarg', F.col('Utarg').cast(DoubleType()))

In [19]:
dfs.show(10)

+------+----------+---------------+------------+-------+---+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia|  Utarg| ID|
+------+----------+---------------+------------+-------+---+
|Polska|  Kowalski|     16.07.2003|       10248| 440.00|  0|
|Polska|  Sowiäski|     10.07.2003|       10249|1863.40|  1|
|Niemcy|   Peacock|     12.07.2003|       10250|1552.60|  2|
|Niemcy| Leverling|     15.07.2003|       10251| 654.06|  3|
|Niemcy|   Peacock|     11.07.2003|       10252|3597.90|  4|
|Niemcy| Leverling|     16.07.2003|       10253|1444.80|  5|
|Polska|  Kowalski|     23.07.2003|       10254| 556.62|  6|
|Polska|     Dudek|     15.07.2003|       10255|2490.50|  7|
|Niemcy| Leverling|     17.07.2003|       10256| 517.80|  8|
|Niemcy|   Peacock|     22.07.2003|       10257|1119.90|  9|
+------+----------+---------------+------------+-------+---+
only showing top 10 rows



In [20]:
df.write.bucketBy(4, 'ID').mode('overwrite').sortBy('ID').saveAsTable('zamowienia_bucketed')

In [21]:
start_time = time.time()
bucketed_df = spark.sql("SELECT Kraj, SUM(Utarg) AS total_utarg FROM zamowienia_bucketed GROUP BY Kraj")
bucketed_df.show()
bucketed_time = time.time() - start_time
print(f"Czas zapytania na danych wiaderkowanych: {bucketed_time:.4f} sekundy")

+------+------------------+
|  Kraj|       total_utarg|
+------+------------------+
|Niemcy|         894996.49|
|Polska|333330.91000000003|
+------+------------------+

Czas zapytania na danych wiaderkowanych: 0.6358 sekundy


In [22]:
start_time = time.time()
non_bucketed_df = df.groupBy("Kraj").agg(F.sum("Utarg").alias("total_utarg"))
non_bucketed_df.show()
non_bucketed_time = time.time() - start_time
print(f"Czas zapytania na danych niepodzielonych na wiaderka: {non_bucketed_time:.4f} sekundy")

+------+-----------------+
|  Kraj|      total_utarg|
+------+-----------------+
|Niemcy|894996.4900000002|
|Polska|333330.9099999999|
+------+-----------------+

Czas zapytania na danych niepodzielonych na wiaderka: 0.1429 sekundy


In [23]:
df.write.partitionBy("Kraj", "Sprzedawca").format("csv").save('zamowienia2')

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/opt/spark/work-dir/zamowienia2 already exists. Set mode as "overwrite" to overwrite the existing path.

In [None]:
start_time = time.time()

filtered_df = df.filter(df["Kraj"] == "Polska").groupBy("Sprzedawca").agg(F.sum("Utarg").alias("total_utarg"))
filtered_df.show()

non_partitioned_time = time.time() - start_time
print(f"Czas zapytania na danych oryginalnych: {non_partitioned_time:.4f} sekundy")
