In [None]:
# !pip install pyspark==3.3
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pyspark

In [None]:
pyspark.__version__

'3.4.0'

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("TEST").getOrCreate()
spark

#### Pusty obiekt RDD

In [None]:
emptyRDD = spark.sparkContext.emptyRDD()
emptyRDD

EmptyRDD[161] at emptyRDD at NativeMethodAccessorImpl.java:0

In [None]:
# Utworzenie DataFrame z schematem danych z pustego RDD
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# schemat danych
schema = StructType([
    StructField("Name", StringType(), True ),
    StructField("Age", IntegerType(), True ),
    StructField("Job", StringType(), True ),
    StructField("Salary", FloatType(), True ),
])

df = spark.createDataFrame(emptyRDD, schema)
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Salary: float (nullable = true)



In [None]:
# konwersja RDD -> DataFrame
emptyRDD.toDF(schema).printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Salary: float (nullable = true)



In [None]:
spark.createDataFrame([], schema).printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Salary: float (nullable = true)



In [None]:
# pobranie CSV z URLa
!wget -O tips.csv https://eurocash-pyspark.s3.eu-central-1.amazonaws.com/tips.csv

--2023-06-19 09:16:48--  https://eurocash-pyspark.s3.eu-central-1.amazonaws.com/tips.csv
Resolving eurocash-pyspark.s3.eu-central-1.amazonaws.com (eurocash-pyspark.s3.eu-central-1.amazonaws.com)... 3.5.134.169
Connecting to eurocash-pyspark.s3.eu-central-1.amazonaws.com (eurocash-pyspark.s3.eu-central-1.amazonaws.com)|3.5.134.169|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8188 (8.0K) [text/csv]
Saving to: ‘tips.csv’


2023-06-19 09:16:48 (226 MB/s) - ‘tips.csv’ saved [8188/8188]



In [None]:
# Załadowanie danych z CSV
df = spark.read.csv("tips.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [None]:
df = spark.read.option("header", True)\
                .option("inferSchema",True)\
                .option("delimiter",",")\
                .csv("tips.csv")
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [None]:
df.show(n=10)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 10 rows



In [None]:
df.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [None]:
# wymuszanie schematu danych

schema = StructType().add('total_bill',FloatType(),True)\
                      .add('tip', FloatType(), True)\
                      .add('sex', StringType(), True)\
                      .add('smoker', StringType(), True)\
                      .add('day', StringType(), True)\
                      .add('time', StringType(), True)\
                      .add('size', IntegerType(), True)

df = spark.read.format("csv")\
          .schema(schema)\
          .option("header", True)\
          .option("mode", "PERMISSIVE")\
           .load("tips.csv")

df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [None]:
# rzutowanie Size z int na float
from pyspark.sql.functions import col

df = df.withColumn("size",  col("size").cast(FloatType()) )
df.printSchema()

root
 |-- total_bill: float (nullable = true)
 |-- tip: float (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: float (nullable = true)



In [None]:
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner| 2.0|
|     10.34|1.66|  Male|    No|Sun|Dinner| 3.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner| 3.0|
|     23.68|3.31|  Male|    No|Sun|Dinner| 2.0|
|     24.59|3.61|Female|    No|Sun|Dinner| 4.0|
|     25.29|4.71|  Male|    No|Sun|Dinner| 4.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner| 2.0|
|     26.88|3.12|  Male|    No|Sun|Dinner| 4.0|
|     15.04|1.96|  Male|    No|Sun|Dinner| 2.0|
|     14.78|3.23|  Male|    No|Sun|Dinner| 2.0|
|     10.27|1.71|  Male|    No|Sun|Dinner| 2.0|
|     35.26| 5.0|Female|    No|Sun|Dinner| 4.0|
|     15.42|1.57|  Male|    No|Sun|Dinner| 2.0|
|     18.43| 3.0|  Male|    No|Sun|Dinner| 4.0|
|     14.83|3.02|Female|    No|Sun|Dinner| 2.0|
|     21.58|3.92|  Male|    No|Sun|Dinner| 2.0|
|     10.33|1.67|Female|    No|Sun|Dinner| 3.0|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [None]:
# Zapisz DataFrame do CSV
df.write.partitionBy("smoker").format("csv")\
            .mode("overwrite")\
            .options(header=True, delimiter=",")\
            .save("tips-out")

In [None]:
import glob

In [None]:
glob.glob("tips-out/**/*.csv", recursive=True)

['tips-out/smoker=Yes/part-00000-9a390b12-6a1e-41e1-9696-a66deb36ec20.c000.csv',
 'tips-out/smoker=No/part-00000-9a390b12-6a1e-41e1-9696-a66deb36ec20.c000.csv']

### Ładowanie plików parquet

In [None]:
!mkdir data
!wget -O data/user.parquet https://eurocash-pyspark.s3.eu-central-1.amazonaws.com/userdata1.parquet

mkdir: cannot create directory ‘data’: File exists
--2023-06-19 09:16:52--  https://eurocash-pyspark.s3.eu-central-1.amazonaws.com/userdata1.parquet
Resolving eurocash-pyspark.s3.eu-central-1.amazonaws.com (eurocash-pyspark.s3.eu-central-1.amazonaws.com)... 3.5.134.169
Connecting to eurocash-pyspark.s3.eu-central-1.amazonaws.com (eurocash-pyspark.s3.eu-central-1.amazonaws.com)|3.5.134.169|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 113629 (111K) [binary/octet-stream]
Saving to: ‘data/user.parquet’


2023-06-19 09:16:52 (5.67 MB/s) - ‘data/user.parquet’ saved [113629/113629]



In [None]:
df = spark.read.parquet("data/user.parquet")
df.printSchema()

root
 |-- registration_dttm: timestamp (nullable = true)
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- country: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- title: string (nullable = true)
 |-- comments: string (nullable = true)



In [None]:
df.show(n=5)

+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|     country|birthdate|   salary|               title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural

In [None]:
df.count()

1000

In [None]:
df.columns

['registration_dttm',
 'id',
 'first_name',
 'last_name',
 'email',
 'gender',
 'ip_address',
 'cc',
 'country',
 'birthdate',
 'salary',
 'title',
 'comments']

In [None]:
df.dtypes

[('registration_dttm', 'timestamp'),
 ('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('email', 'string'),
 ('gender', 'string'),
 ('ip_address', 'string'),
 ('cc', 'string'),
 ('country', 'string'),
 ('birthdate', 'string'),
 ('salary', 'double'),
 ('title', 'string'),
 ('comments', 'string')]

In [None]:
df.schema.jsonValue()

{'type': 'struct',
 'fields': [{'name': 'registration_dttm',
   'type': 'timestamp',
   'nullable': True,
   'metadata': {}},
  {'name': 'id', 'type': 'integer', 'nullable': True, 'metadata': {}},
  {'name': 'first_name', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'last_name', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'email', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'gender', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'ip_address', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'cc', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'country', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'birthdate', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'salary', 'type': 'double', 'nullable': True, 'metadata': {}},
  {'name': 'title', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'comments', 'type': 'string', 'nullable': T

In [None]:
# zapis Parquet do pliku
df.write.partitionBy("gender","country").mode("overwrite").parquet("userdata/")

In [None]:
df.show()

+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male| 218.111.175.34|                   |              Canada| 1/16/1968|150280.17|       Accountant IV|                    |
|2016-02-03 01:09:31|  3|

In [None]:
file = "/content/userdata/gender=Female/country=Argentina/part-00000-82c21200-850c-4ea6-a058-80548bb204e3.c000.snappy.parquet"
df = spark.read.parquet(file)
df.show()

+-------------------+---+----------+----------+--------------------+---------------+----------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name| last_name|               email|     ip_address|              cc| birthdate|   salary|               title|            comments|
+-------------------+---+----------+----------+--------------------+---------------+----------------+----------+---------+--------------------+--------------------+
|2016-02-03 14:44:16| 91|   Theresa|  Gonzalez| tgonzalez2i@nih.gov|237.106.229.219|                | 8/10/1970| 47723.61|    Product Engineer|                    |
|2016-02-03 19:50:33|276| Christine|Cunningham|ccunningham7n@wp.com|  223.92.43.118|5602250470905243| 2/26/1973|218124.48|Compensation Analyst|../../../../../.....|
|2016-02-03 13:25:04|280|    Andrea|  Mcdonald|amcdonald7r@opens...|    35.61.115.2|4917526443727555| 2/27/1993|102882.64|            Operator|                    |
|2016-02-0