#Setup

In [4]:
pip install pyspark



In [5]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second,
    count, min, max, avg, sum, udf, when
)
from datetime import datetime

spark = SparkSession.builder.appName("a").getOrCreate()

# 2. Początki z PySpark

## 2.1. Tworzenie DataFrame

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('spark').getOrCreate()

In [None]:
df = spark.createDataFrame(
    [
        ("Marcelina", "Tetlak", 32),
        ("Anna", "Radomska", 42),

    ],
    ['first', 'last', 'age']
)

In [None]:
df.show()

+---------+--------+---+
|    first|    last|age|
+---------+--------+---+
|Marcelina|  Tetlak| 32|
|     Anna|Radomska| 42|
+---------+--------+---+



## 2.2. Czytanie danych z .csv

In [None]:
csv1 = spark.read.format('csv').load('data/best_selling_books.csv')
csv1.show()

+--------------------+--------------------+-----------------+---------------+--------------------+--------------------+
|                 _c0|                 _c1|              _c2|            _c3|                 _c4|                 _c5|
+--------------------+--------------------+-----------------+---------------+--------------------+--------------------+
|                Book|           Author(s)|Original language|First published|Approximate sales...|               Genre|
|A Tale of Two Cities|     Charles Dickens|          English|           1859|                 200|  Historical fiction|
|The Little Prince...|Antoine de Saint-...|           French|           1943|                 200|             Novella|
|Harry Potter and ...|       J. K. Rowling|          English|           1997|                 120|             Fantasy|
|And Then There We...|     Agatha Christie|          English|           1939|                 100|             Mystery|
|Dream of the Red ...|          Cao Xueq

In [None]:
csv2 = spark.read.format('csv').load('data/country-codes.csv')
csv2.show()

+--------------------+
|                 _c0|
+--------------------+
|  Afghanistan;AF;AFG|
|Åland Islands;AX;ALA|
|      Albania;AL;ALB|
|      Algeria;DZ;DZA|
|American Samoa;AS...|
|      Andorra;AD;AND|
|       Angola;AO;AGO|
|     Anguilla;AI;AIA|
|   Antarctica;AQ;ATA|
|Antigua and Barbu...|
|    Argentina;AR;ARG|
|      Armenia;AM;ARM|
|        Aruba;AW;ABW|
|    Australia;AU;AUS|
|      Austria;AT;AUT|
|   Azerbaijan;AZ;AZE|
|      Bahamas;BS;BHS|
|      Bahrain;BH;BHR|
|   Bangladesh;BD;BGD|
|     Barbados;BB;BRB|
+--------------------+
only showing top 20 rows



## 2.3. Konfiguracja odczytu .csv

In [None]:
csv3 = (
    spark.read
    .format('csv')
    .options(header=True, sep=",")
    .load('data/best_selling_books.csv')
)
csv3.show()

+--------------------+--------------------+-----------------+---------------+-----------------------------+--------------------+
|                Book|           Author(s)|Original language|First published|Approximate sales in millions|               Genre|
+--------------------+--------------------+-----------------+---------------+-----------------------------+--------------------+
|A Tale of Two Cities|     Charles Dickens|          English|           1859|                          200|  Historical fiction|
|The Little Prince...|Antoine de Saint-...|           French|           1943|                          200|             Novella|
|Harry Potter and ...|       J. K. Rowling|          English|           1997|                          120|             Fantasy|
|And Then There We...|     Agatha Christie|          English|           1939|                          100|             Mystery|
|Dream of the Red ...|          Cao Xueqin|          Chinese|           1791|                    

In [None]:
csv4 = (
    spark.read
    .format('csv')
    .options(header=False, sep=";")
    .load('data/country-codes.csv')
)
csv4.show()

+-------------------+---+---+
|                _c0|_c1|_c2|
+-------------------+---+---+
|        Afghanistan| AF|AFG|
|      Åland Islands| AX|ALA|
|            Albania| AL|ALB|
|            Algeria| DZ|DZA|
|     American Samoa| AS|ASM|
|            Andorra| AD|AND|
|             Angola| AO|AGO|
|           Anguilla| AI|AIA|
|         Antarctica| AQ|ATA|
|Antigua and Barbuda| AG|ATG|
|          Argentina| AR|ARG|
|            Armenia| AM|ARM|
|              Aruba| AW|ABW|
|          Australia| AU|AUS|
|            Austria| AT|AUT|
|         Azerbaijan| AZ|AZE|
|            Bahamas| BS|BHS|
|            Bahrain| BH|BHR|
|         Bangladesh| BD|BGD|
|           Barbados| BB|BRB|
+-------------------+---+---+
only showing top 20 rows



# 3. Schematy

## 3.1. Wyświetlanie schematu DF

In [None]:
df = spark.read.csv("data/Games.csv", header=True, quote="\"") # "Atari, Inc. (Windows)"
df.show()

+--------------------+-----+--------------------+-------+--------------------+--------------------+--------------------+
|                Name|Sales|              Series|Release|               Genre|           Developer|           Publisher|
+--------------------+-----+--------------------+-------+--------------------+--------------------+--------------------+
|PlayerUnknown's B...|   42|                NULL| Dec-17|       Battle royale|        PUBG Studios|             Krafton|
|           Minecraft|   33|           Minecraft| Nov-11|   Sandbox, survival|      Mojang Studios|      Mojang Studios|
|          Diablo III|   20|              Diablo| May-12| Action role-playing|Blizzard Entertai...|Blizzard Entertai...|
|         Garry's Mod|   20|                NULL| Nov-06|             Sandbox|   Facepunch Studios|               Valve|
|            Terraria| 17.2|                NULL| May-11|    Action-adventure|            Re-Logic|            Re-Logic|
|   World of Warcraft|   14|    

In [None]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Series: string (nullable = true)
 |-- Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Publisher: string (nullable = true)



## 3.2. Tworzenie schematu

In [None]:
spark.read.csv("data/best_selling_books.csv", header=True).printSchema()

root
 |-- Book: string (nullable = true)
 |-- Author(s): string (nullable = true)
 |-- Original language: string (nullable = true)
 |-- First published: string (nullable = true)
 |-- Approximate sales in millions: string (nullable = true)
 |-- Genre: string (nullable = true)



## 3.3. Implementacja schematu

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [None]:
schema = StructType(
    [
        StructField("Book", StringType(), False),
        StructField("Authors", StringType(), False),
        StructField("Original Language", StringType(), False),
        StructField("First published", IntegerType(), False),
        StructField("Sales", DoubleType(), False),
        StructField("Genre", StringType(), False)
    ]
)

In [None]:
print(schema)

StructType([StructField('Book', StringType(), False), StructField('Authors', StringType(), False), StructField('Original Language', StringType(), False), StructField('First published', IntegerType(), False), StructField('Sales', DoubleType(), False), StructField('Genre', StringType(), False)])


In [None]:
df = spark.read.csv("data/best_selling_books.csv", header=True, schema=schema)
df.printSchema()

root
 |-- Book: string (nullable = true)
 |-- Authors: string (nullable = true)
 |-- Original Language: string (nullable = true)
 |-- First published: integer (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Genre: string (nullable = true)



In [None]:
csv1 = spark.read.format("csv").schema(schema).load("data/best_selling_books.csv")
csv1.printSchema()

root
 |-- Book: string (nullable = true)
 |-- Authors: string (nullable = true)
 |-- Original Language: string (nullable = true)
 |-- First published: integer (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Genre: string (nullable = true)



In [None]:
csv1 = spark.read.format("csv").schema(schema).load("data/best_selling_books.csv")
csv1.show()

+--------------------+--------------------+-----------------+---------------+-----+--------------------+
|                Book|             Authors|Original Language|First published|Sales|               Genre|
+--------------------+--------------------+-----------------+---------------+-----+--------------------+
|                Book|           Author(s)|Original language|           NULL| NULL|               Genre|
|A Tale of Two Cities|     Charles Dickens|          English|           1859|200.0|  Historical fiction|
|The Little Prince...|Antoine de Saint-...|           French|           1943|200.0|             Novella|
|Harry Potter and ...|       J. K. Rowling|          English|           1997|120.0|             Fantasy|
|And Then There We...|     Agatha Christie|          English|           1939|100.0|             Mystery|
|Dream of the Red ...|          Cao Xueqin|          Chinese|           1791|100.0|         Family saga|
|          The Hobbit|    J. R. R. Tolkien|          En

# 4. Selekcja danych

## 4.1. Wyświetlanie wybranych kolumn

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col

In [None]:
spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
games_schema = StructType(
    [
        StructField("Name", StringType(), False),
        StructField("Sales", DoubleType(), False),
        StructField("Series", StringType(), True),
        StructField("Release", StringType(), False),
        StructField("Genre", StringType(), False),
        StructField("Developer", StringType(), False),
        StructField("Publisher", StringType(), False)
    ]
)

In [None]:
df = spark.read.csv("data/Games.csv", header=True, schema=games_schema)
df.show()

+--------------------+-----+--------------------+-------+--------------------+--------------------+--------------------+
|                Name|Sales|              Series|Release|               Genre|           Developer|           Publisher|
+--------------------+-----+--------------------+-------+--------------------+--------------------+--------------------+
|PlayerUnknown's B...| 42.0|                NULL| Dec-17|       Battle royale|        PUBG Studios|             Krafton|
|           Minecraft| 33.0|           Minecraft| Nov-11|   Sandbox, survival|      Mojang Studios|      Mojang Studios|
|          Diablo III| 20.0|              Diablo| May-12| Action role-playing|Blizzard Entertai...|Blizzard Entertai...|
|         Garry's Mod| 20.0|                NULL| Nov-06|             Sandbox|   Facepunch Studios|               Valve|
|            Terraria| 17.2|                NULL| May-11|    Action-adventure|            Re-Logic|            Re-Logic|
|   World of Warcraft| 14.0|    

In [None]:
df.select (col('name'), col('sales'), col('developer')).show()

+--------------------+-----+--------------------+
|                name|sales|           developer|
+--------------------+-----+--------------------+
|PlayerUnknown's B...| 42.0|        PUBG Studios|
|           Minecraft| 33.0|      Mojang Studios|
|          Diablo III| 20.0|Blizzard Entertai...|
|         Garry's Mod| 20.0|   Facepunch Studios|
|            Terraria| 17.2|            Re-Logic|
|   World of Warcraft| 14.0|Blizzard Entertai...|
|         Half-Life 2| 12.0|               Valve|
|The Witcher 3: Wi...| 12.0|      CD Projekt Red|
|           StarCraft| 11.0|Blizzard Entertai...|
|            The Sims| 11.0|               Maxis|
|           Fall Guys| 10.0|          Mediatonic|
|RollerCoaster Tyc...| 10.0|Frontier Developm...|
|           Half-Life|  9.0|               Valve|
|                Rust|  9.0|   Facepunch Studios|
|      Civilization V|  8.0|       Firaxis Games|
|          The Sims 3|  7.0|               Maxis|
|Euro Truck Simula...|  6.5|        SCS Software|


In [None]:
df.select('name', 'sales', 'developer').show()

+--------------------+-----+--------------------+
|                name|sales|           developer|
+--------------------+-----+--------------------+
|PlayerUnknown's B...| 42.0|        PUBG Studios|
|           Minecraft| 33.0|      Mojang Studios|
|          Diablo III| 20.0|Blizzard Entertai...|
|         Garry's Mod| 20.0|   Facepunch Studios|
|            Terraria| 17.2|            Re-Logic|
|   World of Warcraft| 14.0|Blizzard Entertai...|
|         Half-Life 2| 12.0|               Valve|
|The Witcher 3: Wi...| 12.0|      CD Projekt Red|
|           StarCraft| 11.0|Blizzard Entertai...|
|            The Sims| 11.0|               Maxis|
|           Fall Guys| 10.0|          Mediatonic|
|RollerCoaster Tyc...| 10.0|Frontier Developm...|
|           Half-Life|  9.0|               Valve|
|                Rust|  9.0|   Facepunch Studios|
|      Civilization V|  8.0|       Firaxis Games|
|          The Sims 3|  7.0|               Maxis|
|Euro Truck Simula...|  6.5|        SCS Software|


## 4.2. Sortowanie danych

In [None]:
df.orderBy( col("developer").asc(), col("sales").desc() ).show()

+--------------------+-----+------------------+-------+--------------------+--------------------+--------------------+
|                Name|Sales|            Series|Release|               Genre|           Developer|           Publisher|
+--------------------+-----+------------------+-------+--------------------+--------------------+--------------------+
|       Duke Nukem 3D|  1.0|        Duke Nukem| Jan-96|First-person shooter|           3D Realms|GT Interactive So...|
|         Machinarium|  1.0|              NULL| Oct-09|Graphic adventure...|      Amanita Design|      Amanita Design|
|          Guild Wars|  6.0|        Guild Wars| Apr-05|              MMORPG|            ArenaNet|              NCsoft|
|        Guild Wars 2|  5.0|        Guild Wars| Aug-12|              MMORPG|            ArenaNet|              NCsoft|
|             Magicka|  2.0|              NULL| Jan-11|    Action-adventure|Arrowhead Game St...| Paradox Interactive|
|Patrician III: Ri...|  1.0|     The Patrician| 

In [None]:
df.show(3, truncate=False)

+-----------------------------+-----+---------+-------+-------------------+----------------------+----------------------+
|Name                         |Sales|Series   |Release|Genre              |Developer             |Publisher             |
+-----------------------------+-----+---------+-------+-------------------+----------------------+----------------------+
|PlayerUnknown's Battlegrounds|42.0 |NULL     |Dec-17 |Battle royale      |PUBG Studios          |Krafton               |
|Minecraft                    |33.0 |Minecraft|Nov-11 |Sandbox, survival  |Mojang Studios        |Mojang Studios        |
|Diablo III                   |20.0 |Diablo   |May-12 |Action role-playing|Blizzard Entertainment|Blizzard Entertainment|
+-----------------------------+-----+---------+-------+-------------------+----------------------+----------------------+
only showing top 3 rows



In [None]:
df.limit(5).show(10)

+--------------------+-----+---------+-------+-------------------+--------------------+--------------------+
|                Name|Sales|   Series|Release|              Genre|           Developer|           Publisher|
+--------------------+-----+---------+-------+-------------------+--------------------+--------------------+
|PlayerUnknown's B...| 42.0|     NULL| Dec-17|      Battle royale|        PUBG Studios|             Krafton|
|           Minecraft| 33.0|Minecraft| Nov-11|  Sandbox, survival|      Mojang Studios|      Mojang Studios|
|          Diablo III| 20.0|   Diablo| May-12|Action role-playing|Blizzard Entertai...|Blizzard Entertai...|
|         Garry's Mod| 20.0|     NULL| Nov-06|            Sandbox|   Facepunch Studios|               Valve|
|            Terraria| 17.2|     NULL| May-11|   Action-adventure|            Re-Logic|            Re-Logic|
+--------------------+-----+---------+-------+-------------------+--------------------+--------------------+



## 4.3. Limit i collect

In [None]:
df.limit(1).collect()

[Row(Name="PlayerUnknown's Battlegrounds", Sales=42.0, Series=None, Release='Dec-17', Genre='Battle royale', Developer='PUBG Studios', Publisher='Krafton')]

In [None]:
df.limit(1).collect()[0]

Row(Name="PlayerUnknown's Battlegrounds", Sales=42.0, Series=None, Release='Dec-17', Genre='Battle royale', Developer='PUBG Studios', Publisher='Krafton')

In [None]:
df.limit(1).collect()[0][1]

42.0

## 4.4. Dodawanie kolumny

In [None]:
df = df.withColumn('SalesX1000', col('sales') * 1000 )

In [None]:
df.show()

+--------------------+-----+--------------------+-------+--------------------+--------------------+--------------------+----------+
|                Name|Sales|              Series|Release|               Genre|           Developer|           Publisher|SalesX1000|
+--------------------+-----+--------------------+-------+--------------------+--------------------+--------------------+----------+
|PlayerUnknown's B...| 42.0|                NULL| Dec-17|       Battle royale|        PUBG Studios|             Krafton|   42000.0|
|           Minecraft| 33.0|           Minecraft| Nov-11|   Sandbox, survival|      Mojang Studios|      Mojang Studios|   33000.0|
|          Diablo III| 20.0|              Diablo| May-12| Action role-playing|Blizzard Entertai...|Blizzard Entertai...|   20000.0|
|         Garry's Mod| 20.0|                NULL| Nov-06|             Sandbox|   Facepunch Studios|               Valve|   20000.0|
|            Terraria| 17.2|                NULL| May-11|    Action-adventur

# 5. Kolekcje, daty i funkcje

## 5.1. Lista i słownik w DataFrame

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, size, lit, explode

In [None]:
spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True)
    ]
)

In [None]:
emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active"),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired"),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active"),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired"),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New"),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active"),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New")
    ],
    schema
)

In [None]:
emp.printSchema()

root
 |-- id: integer (nullable = false)
 |-- first: string (nullable = false)
 |-- last: string (nullable = false)
 |-- skills: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- salary: integer (nullable = false)
 |-- role: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- status: string (nullable = true)



In [None]:
emp.limit(3).show(truncate=False)

+---+-------+--------+--------------------------+------+----------------------------------------+------+
|id |first  |last    |skills                    |salary|role                                    |status|
+---+-------+--------+--------------------------+------+----------------------------------------+------+
|1  |Adam   |Nowak   |[SQL, Java, GCP]          |3500  |{level -> 1, position -> Java Developer}|NULL  |
|2  |Jan    |Kowalski|[SQL, Java, Azure, Spring]|8000  |{level -> 3, position -> Java Developer}|Active|
|3  |Dominik|Bajt    |[Python, MongoDB, Redis]  |4000  |{level -> 1, position -> Data Developer}|NULL  |
+---+-------+--------+--------------------------+------+----------------------------------------+------+



In [None]:
emp.limit(3).show()

+---+-------+--------+--------------------+------+--------------------+------+
| id|  first|    last|              skills|salary|                role|status|
+---+-------+--------+--------------------+------+--------------------+------+
|  1|   Adam|   Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|
|  2|    Jan|Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|
|  3|Dominik|    Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|
+---+-------+--------+--------------------+------+--------------------+------+



## 5.2. getItem oraz size

In [None]:
emp.select( col("skills")[1], col("role")["level"] ).show()

+---------+-----------+
|skills[1]|role[level]|
+---------+-----------+
|     Java|          1|
|     Java|          3|
|  MongoDB|          1|
|   Python|          1|
|    CI/CD|          2|
|      GCP|          2|
|    CI/CD|          3|
|     Java|          0|
|     Java|          0|
|   Django|          1|
+---------+-----------+



In [None]:
emp.select(col("skills").getItem(1), col("role").getItem('position')).show()

+---------+----------------+
|skills[1]|  role[position]|
+---------+----------------+
|     Java|  Java Developer|
|     Java|  Java Developer|
|  MongoDB|  Data Developer|
|   Python|  Data Scientist|
|    CI/CD|          DevOps|
|      GCP| Cloud Architect|
|    CI/CD|          DevOps|
|     Java|          Intern|
|     Java|          Intern|
|   Django|Python Developer|
+---------+----------------+



In [None]:
emp.select(
    col("skills").getItem(1),
    col("role").getItem('position'),
    size(col('skills')),
    size(col('role'))
).show()

+---------+----------------+------------+----------+
|skills[1]|  role[position]|size(skills)|size(role)|
+---------+----------------+------------+----------+
|     Java|  Java Developer|           3|         2|
|     Java|  Java Developer|           4|         2|
|  MongoDB|  Data Developer|           3|         2|
|   Python|  Data Scientist|           3|         2|
|    CI/CD|          DevOps|           3|         2|
|      GCP| Cloud Architect|           4|         2|
|    CI/CD|          DevOps|           5|         2|
|     Java|          Intern|           3|         2|
|     Java|          Intern|           3|         2|
|   Django|Python Developer|           3|         2|
+---------+----------------+------------+----------+



## 5.3. lit i explode

In [None]:
emp.withColumn("company", lit('Dziurex')).show()

+---+---------+----------+--------------------+------+--------------------+------+-------+
| id|    first|      last|              skills|salary|                role|status|company|
+---+---------+----------+--------------------+------+--------------------+------+-------+
|  1|     Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|Dziurex|
|  2|      Jan|  Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|Dziurex|
|  3|  Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|Dziurex|
|  4|      Ewa|    Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|Dziurex|
|  5|Krzysztof| Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|Dziurex|
|  6|      Ewa| Kierownik|[Azure, GCP, AWS,...| 12500|{level -> 2, posi...| Fired|Dziurex|
|  7|     Adam|  Kowalski|[Git, CI/CD, Dock...| 10500|{level -> 3, posi...|   New|Dziurex|
|  8| Dominika|Praktyczna| [SQL, Java, Python]|  3000|{level -> 0, posi...|  NULL|Dziurex|

In [None]:
emp.select(
    col('id'),
    explode(col('skills'))
).show()

+---+-------+
| id|    col|
+---+-------+
|  1|    SQL|
|  1|   Java|
|  1|    GCP|
|  2|    SQL|
|  2|   Java|
|  2|  Azure|
|  2| Spring|
|  3| Python|
|  3|MongoDB|
|  3|  Redis|
|  4|    SQL|
|  4| Python|
|  4| Pandas|
|  5|    Git|
|  5|  CI/CD|
|  5| Docker|
|  6|  Azure|
|  6|    GCP|
|  6|    AWS|
|  6|  Linux|
+---+-------+
only showing top 20 rows



In [None]:
emp.select(
    col('id'),
    explode(col('role'))
).show()

+---+--------+----------------+
| id|     key|           value|
+---+--------+----------------+
|  1|   level|               1|
|  1|position|  Java Developer|
|  2|   level|               3|
|  2|position|  Java Developer|
|  3|   level|               1|
|  3|position|  Data Developer|
|  4|   level|               1|
|  4|position|  Data Scientist|
|  5|   level|               2|
|  5|position|          DevOps|
|  6|   level|               2|
|  6|position| Cloud Architect|
|  7|   level|               3|
|  7|position|          DevOps|
|  8|   level|               0|
|  8|position|          Intern|
|  9|   level|               0|
|  9|position|          Intern|
| 10|   level|               1|
| 10|position|Python Developer|
+---+--------+----------------+



## 5.4. Konkatenacja

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second
)

spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True)
    ]
)

In [None]:
emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active"),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired"),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active"),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired"),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New"),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active"),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New")
    ],
    schema
)

In [None]:
emp.withColumn("employee", concat(col('first'), lit(' '), col('last'))).show()

+---+---------+----------+--------------------+------+--------------------+------+-------------------+
| id|    first|      last|              skills|salary|                role|status|           employee|
+---+---------+----------+--------------------+------+--------------------+------+-------------------+
|  1|     Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|         Adam Nowak|
|  2|      Jan|  Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|       Jan Kowalski|
|  3|  Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|       Dominik Bajt|
|  4|      Ewa|    Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|         Ewa Piksel|
|  5|Krzysztof| Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|Krzysztof Zależność|
|  6|      Ewa| Kierownik|[Azure, GCP, AWS,...| 12500|{level -> 2, posi...| Fired|      Ewa Kierownik|
|  7|     Adam|  Kowalski|[Git, CI/CD, Dock...| 10500|{level -> 3, posi..

In [None]:
emp.select(
    concat_ws(',', col('id'), col('first'), col('last'))
).show()

+-----------------------------+
|concat_ws(,, id, first, last)|
+-----------------------------+
|                 1,Adam,Nowak|
|               2,Jan,Kowalski|
|               3,Dominik,Bajt|
|                 4,Ewa,Piksel|
|         5,Krzysztof,Zależ...|
|              6,Ewa,Kierownik|
|              7,Adam,Kowalski|
|         8,Dominika,Prakty...|
|             9,Jan,Praktyczny|
|          10,Mikołaj,Sobieski|
+-----------------------------+



## 5.5. substring

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second
)

spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True)
    ]
)

In [None]:
emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active"),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired"),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active"),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired"),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New"),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active"),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New")
    ],
    schema
)

In [None]:
emp.select(
    substring(col('first'), 0, 2), # funkcja substring wycina podciąg (fragment tekstu)
    col('first')[0:2] # alternatywna metoda wykorzystująca notację slice
).show()

+----------------------+----------------------+
|substring(first, 0, 2)|substring(first, 0, 2)|
+----------------------+----------------------+
|                    Ad|                    Ad|
|                    Ja|                    Ja|
|                    Do|                    Do|
|                    Ew|                    Ew|
|                    Kr|                    Kr|
|                    Ew|                    Ew|
|                    Ad|                    Ad|
|                    Do|                    Do|
|                    Ja|                    Ja|
|                    Mi|                    Mi|
+----------------------+----------------------+



In [None]:
emp.select(
    substring(col('first'), 5, 2),
    col('first')[5:2]
).show()

+----------------------+----------------------+
|substring(first, 5, 2)|substring(first, 5, 2)|
+----------------------+----------------------+
|                      |                      |
|                      |                      |
|                    ni|                    ni|
|                      |                      |
|                    sz|                    sz|
|                      |                      |
|                      |                      |
|                    ni|                    ni|
|                      |                      |
|                    ła|                    ła|
+----------------------+----------------------+



## 5.6. DateType i TimestampType

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second
)

from datetime import datetime

spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True),
        StructField("hire_date", DateType(), True),
        StructField("hire_timestamp", TimestampType(), True)
    ]
)

emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None,
         datetime(2023, 5, 1), datetime(2023, 5, 1, 12, 0, 0)),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active",
         datetime(2023, 5, 10), datetime(2023, 5, 10, 16, 0, 0)),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None,
         datetime(2023, 5, 15), datetime(2023, 5, 15, 8, 0, 0)),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired",
         datetime(2023, 6, 10), datetime(2023, 6, 1, 11, 0, 0)),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active",
         datetime(2023, 6, 15), datetime(2023, 6, 15, 11, 30, 0)),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired",
         datetime(2023, 6, 20), datetime(2023, 6, 20, 12, 0)),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 9, 0)),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None,
         datetime(2023, 1, 30), datetime(2023, 1, 30, 7, 0)),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active",
         datetime(2023, 3, 20), datetime(2023, 3, 20, 11, 45)),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 8, 35))
    ],
    schema
)

In [None]:
emp.show()

+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+
| id|    first|      last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+
|  1|     Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|
|  2|      Jan|  Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-10 16:00:00|
|  3|  Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|
|  4|      Ewa|    Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|2023-06-10|2023-06-01 11:00:00|
|  5|Krzysztof| Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|2023-06-15|2023-06-15 11:30:00|
|  6|      Ewa| Kierownik|[Azure, GCP, AWS,...| 12500|{level -> 2, posi...| Fired|2023-0

## 5.7. datediff

In [None]:
emp.select(
    datediff(col('hire_date'), lit(datetime(2023, 9, 1))),
    datediff(lit(datetime(2023, 9, 1)), col('hire_date'))
).show()

+----------------------------------------------------+----------------------------------------------------+
|datediff(hire_date, TIMESTAMP '2023-09-01 00:00:00')|datediff(TIMESTAMP '2023-09-01 00:00:00', hire_date)|
+----------------------------------------------------+----------------------------------------------------+
|                                                -123|                                                 123|
|                                                -114|                                                 114|
|                                                -109|                                                 109|
|                                                 -83|                                                  83|
|                                                 -78|                                                  78|
|                                                 -73|                                                  73|
|                           

## 5.8. date_add/date_sub

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second
)

from datetime import datetime

spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True),
        StructField("hire_date", DateType(), True),
        StructField("hire_timestamp", TimestampType(), True)
    ]
)

emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None,
         datetime(2023, 5, 1), datetime(2023, 5, 1, 12, 0, 0)),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active",
         datetime(2023, 5, 10), datetime(2023, 5, 10, 16, 0, 0)),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None,
         datetime(2023, 5, 15), datetime(2023, 5, 15, 8, 0, 0)),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired",
         datetime(2023, 6, 10), datetime(2023, 6, 1, 11, 0, 0)),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active",
         datetime(2023, 6, 15), datetime(2023, 6, 15, 11, 30, 0)),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired",
         datetime(2023, 6, 20), datetime(2023, 6, 20, 12, 0)),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 9, 0)),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None,
         datetime(2023, 1, 30), datetime(2023, 1, 30, 7, 0)),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active",
         datetime(2023, 3, 20), datetime(2023, 3, 20, 11, 45)),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 8, 35))
    ],
    schema
)

In [None]:
emp.select(
    col('hire_date'),
    date_add(col('hire_date'), 10),
    date_sub(col('hire_date'), 30)
).show()

+----------+-----------------------+-----------------------+
| hire_date|date_add(hire_date, 10)|date_sub(hire_date, 30)|
+----------+-----------------------+-----------------------+
|2023-05-01|             2023-05-11|             2023-04-01|
|2023-05-10|             2023-05-20|             2023-04-10|
|2023-05-15|             2023-05-25|             2023-04-15|
|2023-06-10|             2023-06-20|             2023-05-11|
|2023-06-15|             2023-06-25|             2023-05-16|
|2023-06-20|             2023-06-30|             2023-05-21|
|2023-01-20|             2023-01-30|             2022-12-21|
|2023-01-30|             2023-02-09|             2022-12-31|
|2023-03-20|             2023-03-30|             2023-02-18|
|2023-01-20|             2023-01-30|             2022-12-21|
+----------+-----------------------+-----------------------+



In [None]:
emp.select(
    col('hire_date'),
    date_add(col('hire_date'), -10),
    date_sub(col('hire_date'), -30)
).show()

+----------+------------------------+------------------------+
| hire_date|date_add(hire_date, -10)|date_sub(hire_date, -30)|
+----------+------------------------+------------------------+
|2023-05-01|              2023-04-21|              2023-05-31|
|2023-05-10|              2023-04-30|              2023-06-09|
|2023-05-15|              2023-05-05|              2023-06-14|
|2023-06-10|              2023-05-31|              2023-07-10|
|2023-06-15|              2023-06-05|              2023-07-15|
|2023-06-20|              2023-06-10|              2023-07-20|
|2023-01-20|              2023-01-10|              2023-02-19|
|2023-01-30|              2023-01-20|              2023-03-01|
|2023-03-20|              2023-03-10|              2023-04-19|
|2023-01-20|              2023-01-10|              2023-02-19|
+----------+------------------------+------------------------+



## 5.9. Ekstrakcja danej jednostki czasu

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second
)

from datetime import datetime

spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True),
        StructField("hire_date", DateType(), True),
        StructField("hire_timestamp", TimestampType(), True)
    ]
)

emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None,
         datetime(2023, 5, 1), datetime(2023, 5, 1, 12, 0, 0)),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active",
         datetime(2023, 5, 10), datetime(2023, 5, 10, 16, 0, 0)),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None,
         datetime(2023, 5, 15), datetime(2023, 5, 15, 8, 0, 0)),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired",
         datetime(2023, 6, 10), datetime(2023, 6, 1, 11, 0, 0)),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active",
         datetime(2023, 6, 15), datetime(2023, 6, 15, 11, 30, 0)),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired",
         datetime(2023, 6, 20), datetime(2023, 6, 20, 12, 0)),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 9, 0)),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None,
         datetime(2023, 1, 30), datetime(2023, 1, 30, 7, 0)),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active",
         datetime(2023, 3, 20), datetime(2023, 3, 20, 11, 45)),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 8, 35))
    ],
    schema
)

In [None]:
(
    emp
    .withColumn( 'year', year(col('hire_timestamp')) )
    .withColumn( 'day_of_week', dayofweek(col('hire_timestamp')) )
    .withColumn( 'week_of_year', weekofyear(col('hire_timestamp')) )
    .withColumn( 'hour', hour(col('hire_timestamp')) )
    .limit(3)
    .show()
)

+---+-------+--------+--------------------+------+--------------------+------+----------+-------------------+----+-----------+------------+----+
| id|  first|    last|              skills|salary|                role|status| hire_date|     hire_timestamp|year|day_of_week|week_of_year|hour|
+---+-------+--------+--------------------+------+--------------------+------+----------+-------------------+----+-----------+------------+----+
|  1|   Adam|   Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|2023|          2|          18|  12|
|  2|    Jan|Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-10 16:00:00|2023|          4|          19|  16|
|  3|Dominik|    Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|2023|          2|          20|   8|
+---+-------+--------+--------------------+------+--------------------+------+----------+-------------------+----+-----------+----

# 6. Filtrowanie danych

## 6.1. Unikatowe wiersze

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second
)

from datetime import datetime

spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True),
        StructField("hire_date", DateType(), True),
        StructField("hire_timestamp", TimestampType(), True)
    ]
)

emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None,
         datetime(2023, 5, 1), datetime(2023, 5, 1, 12, 0, 0)),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active",
         datetime(2023, 5, 10), datetime(2023, 5, 10, 16, 0, 0)),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None,
         datetime(2023, 5, 15), datetime(2023, 5, 15, 8, 0, 0)),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired",
         datetime(2023, 6, 10), datetime(2023, 6, 1, 11, 0, 0)),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active",
         datetime(2023, 6, 15), datetime(2023, 6, 15, 11, 30, 0)),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired",
         datetime(2023, 6, 20), datetime(2023, 6, 20, 12, 0)),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 9, 0)),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None,
         datetime(2023, 1, 30), datetime(2023, 1, 30, 7, 0)),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active",
         datetime(2023, 3, 20), datetime(2023, 3, 20, 11, 45)),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 8, 35))
    ],
    schema
)

In [None]:
emp.printSchema()

root
 |-- id: integer (nullable = false)
 |-- first: string (nullable = false)
 |-- last: string (nullable = false)
 |-- skills: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- salary: integer (nullable = false)
 |-- role: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- status: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- hire_timestamp: timestamp (nullable = true)



In [None]:
games = spark.read.csv('data/Games.csv', header=True).select(col('Series'), col('Developer'), col('Publisher')).orderBy(col('Developer'))

In [None]:
games.limit(10).show(truncate=False)

+------------------+----------------------+-----------------------+
|Series            |Developer             |Publisher              |
+------------------+----------------------+-----------------------+
|Duke Nukem        |3D Realms             |GT Interactive Software|
|NULL              |Amanita Design        |Amanita Design         |
|Guild Wars        |ArenaNet              |NCsoft                 |
|Guild Wars        |ArenaNet              |NCsoft                 |
|NULL              |Arrowhead Game Studios|Paradox Interactive    |
|The Patrician     |Ascaron               |Encore                 |
|Sacred            |Ascaron               |Encore                 |
|Baldur's Gate     |BioWare               |Interplay Entertainment|
|Baldur's Gate     |BioWare               |Interplay Entertainment|
|Neverwinter Nights|BioWare               |Infogrames / Atari     |
+------------------+----------------------+-----------------------+



In [None]:
games.distinct().orderBy(col('Developer')).limit(10).show(truncate=False)

+------------------+----------------------+-----------------------+
|Series            |Developer             |Publisher              |
+------------------+----------------------+-----------------------+
|Duke Nukem        |3D Realms             |GT Interactive Software|
|NULL              |Amanita Design        |Amanita Design         |
|Guild Wars        |ArenaNet              |NCsoft                 |
|NULL              |Arrowhead Game Studios|Paradox Interactive    |
|Sacred            |Ascaron               |Encore                 |
|The Patrician     |Ascaron               |Encore                 |
|Neverwinter Nights|BioWare               |Infogrames / Atari     |
|Baldur's Gate     |BioWare               |Interplay Entertainment|
|Diablo            |Blizzard Entertainment|Blizzard Entertainment |
|Warcraft          |Blizzard Entertainment|Blizzard Entertainment |
+------------------+----------------------+-----------------------+



In [None]:
games.dropDuplicates().orderBy(col('Developer')).limit(10).show(truncate=False)

+------------------+----------------------+-----------------------+
|Series            |Developer             |Publisher              |
+------------------+----------------------+-----------------------+
|Duke Nukem        |3D Realms             |GT Interactive Software|
|NULL              |Amanita Design        |Amanita Design         |
|Guild Wars        |ArenaNet              |NCsoft                 |
|NULL              |Arrowhead Game Studios|Paradox Interactive    |
|Sacred            |Ascaron               |Encore                 |
|The Patrician     |Ascaron               |Encore                 |
|Neverwinter Nights|BioWare               |Infogrames / Atari     |
|Baldur's Gate     |BioWare               |Interplay Entertainment|
|Diablo            |Blizzard Entertainment|Blizzard Entertainment |
|Warcraft          |Blizzard Entertainment|Blizzard Entertainment |
+------------------+----------------------+-----------------------+



In [None]:
games.dropDuplicates(['Series']).orderBy(col('Developer')).limit(10).show(truncate=False)

+------------------+----------------------+-----------------------+
|Series            |Developer             |Publisher              |
+------------------+----------------------+-----------------------+
|Duke Nukem        |3D Realms             |GT Interactive Software|
|NULL              |Amanita Design        |Amanita Design         |
|Guild Wars        |ArenaNet              |NCsoft                 |
|The Patrician     |Ascaron               |Encore                 |
|Sacred            |Ascaron               |Encore                 |
|Baldur's Gate     |BioWare               |Interplay Entertainment|
|Neverwinter Nights|BioWare               |Infogrames / Atari     |
|StarCraft         |Blizzard Entertainment|Blizzard Entertainment |
|Warcraft          |Blizzard Entertainment|Blizzard Entertainment |
|Diablo            |Blizzard Entertainment|Blizzard Entertainment |
+------------------+----------------------+-----------------------+



## 6.2. Filtrowanie danych cz. 1

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second
)

from datetime import datetime

spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
books = spark.read.csv('data/best_selling_books.csv', header=True)

In [None]:
books.limit(5).show()

+--------------------+--------------------+-----------------+---------------+-----------------------------+------------------+
|                Book|           Author(s)|Original language|First published|Approximate sales in millions|             Genre|
+--------------------+--------------------+-----------------+---------------+-----------------------------+------------------+
|A Tale of Two Cities|     Charles Dickens|          English|           1859|                          200|Historical fiction|
|The Little Prince...|Antoine de Saint-...|           French|           1943|                          200|           Novella|
|Harry Potter and ...|       J. K. Rowling|          English|           1997|                          120|           Fantasy|
|And Then There We...|     Agatha Christie|          English|           1939|                          100|           Mystery|
|Dream of the Red ...|          Cao Xueqin|          Chinese|           1791|                          100|    

In [None]:
books.filter(
    col('Original language') == 'Portuguese'

).show()

+--------------------+------------+-----------------+---------------+-----------------------------+-------+
|                Book|   Author(s)|Original language|First published|Approximate sales in millions|  Genre|
+--------------------+------------+-----------------+---------------+-----------------------------+-------+
|The Alchemist (O ...|Paulo Coelho|       Portuguese|           1988|                           65|Fantasy|
+--------------------+------------+-----------------+---------------+-----------------------------+-------+



In [None]:
books.filter(
    col('First published') > 2010

).show(truncate=False)

+-----------------------------------+--------------+-----------------+---------------+-----------------------------+-----------------------------+
|Book                               |Author(s)     |Original language|First published|Approximate sales in millions|Genre                        |
+-----------------------------------+--------------+-----------------+---------------+-----------------------------+-----------------------------+
|The Fault in Our Stars             |John Green    |English          |2012           |23                           |Young adult romantic novel   |
|The Girl on the Train              |Paula Hawkins |English          |2015           |23                           |Thriller                     |
|Gone Girl                          |Gillian Flynn |English          |2012           |20                           |Crime thriller novel         |
|Where the Crawdads Sing            |Delia Owens   |English          |2018           |18                           |Co

In [None]:
emp.limit(5).show()

+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+
| id|    first|     last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+
|  1|     Adam|    Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|
|  2|      Jan| Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-10 16:00:00|
|  3|  Dominik|     Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|
|  4|      Ewa|   Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|2023-06-10|2023-06-01 11:00:00|
|  5|Krzysztof|Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|2023-06-15|2023-06-15 11:30:00|
+---+---------+---------+--------------------+------+--------------------+------+----------+----

Zapis funkcyjny

In [None]:
emp.filter(
    size(col('skills')) > 3
).show(truncate=False)

+---+-----+---------+---------------------------------------+------+-----------------------------------------+------+----------+-------------------+
|id |first|last     |skills                                 |salary|role                                     |status|hire_date |hire_timestamp     |
+---+-----+---------+---------------------------------------+------+-----------------------------------------+------+----------+-------------------+
|2  |Jan  |Kowalski |[SQL, Java, Azure, Spring]             |8000  |{level -> 3, position -> Java Developer} |Active|2023-05-10|2023-05-10 16:00:00|
|6  |Ewa  |Kierownik|[Azure, GCP, AWS, Linux]               |12500 |{level -> 2, position -> Cloud Architect}|Fired |2023-06-20|2023-06-20 12:00:00|
|7  |Adam |Kowalski |[Git, CI/CD, Docker, Linux, Kubernetes]|10500 |{level -> 3, position -> DevOps}         |New   |2023-01-20|2023-01-20 09:00:00|
+---+-----+---------+---------------------------------------+------+--------------------------------------

Zapis SQL

In [None]:
emp.filter(
    "last == 'Kowalski' "
).show()

+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+
| id|first|    last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+
|  2|  Jan|Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-10 16:00:00|
|  7| Adam|Kowalski|[Git, CI/CD, Dock...| 10500|{level -> 3, posi...|   New|2023-01-20|2023-01-20 09:00:00|
+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+



In [None]:
emp.filter(
    "hire_date > '2023-05-10' "
).show()

+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+
| id|    first|     last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+
|  3|  Dominik|     Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|
|  4|      Ewa|   Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|2023-06-10|2023-06-01 11:00:00|
|  5|Krzysztof|Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|2023-06-15|2023-06-15 11:30:00|
|  6|      Ewa|Kierownik|[Azure, GCP, AWS,...| 12500|{level -> 2, posi...| Fired|2023-06-20|2023-06-20 12:00:00|
+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+



In [None]:
emp.filter(
    col('hire_date') > datetime(2023, 5, 10)
).show()

+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+
| id|    first|     last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+
|  3|  Dominik|     Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|
|  4|      Ewa|   Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|2023-06-10|2023-06-01 11:00:00|
|  5|Krzysztof|Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|2023-06-15|2023-06-15 11:30:00|
|  6|      Ewa|Kierownik|[Azure, GCP, AWS,...| 12500|{level -> 2, posi...| Fired|2023-06-20|2023-06-20 12:00:00|
+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+



## 6.3. Filtrowanie danych cz. 2

In [None]:
emp.filter(
    col('role').getItem('position').isin(['DevOps', 'Intern'])
).show(truncate=False)

+---+---------+----------+---------------------------------------+------+--------------------------------+------+----------+-------------------+
|id |first    |last      |skills                                 |salary|role                            |status|hire_date |hire_timestamp     |
+---+---------+----------+---------------------------------------+------+--------------------------------+------+----------+-------------------+
|5  |Krzysztof|Zależność |[Git, CI/CD, Docker]                   |8000  |{level -> 2, position -> DevOps}|Active|2023-06-15|2023-06-15 11:30:00|
|7  |Adam     |Kowalski  |[Git, CI/CD, Docker, Linux, Kubernetes]|10500 |{level -> 3, position -> DevOps}|New   |2023-01-20|2023-01-20 09:00:00|
|8  |Dominika |Praktyczna|[SQL, Java, Python]                    |3000  |{level -> 0, position -> Intern}|NULL  |2023-01-30|2023-01-30 07:00:00|
|9  |Jan      |Praktyczny|[SQL, Java, Python]                    |3000  |{level -> 0, position -> Intern}|Active|2023-03-20|2023-0

In [None]:
emp.filter(
    col('salary').between(3000, 4000)
).show()

+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+
| id|   first|      last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+
|  1|    Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|
|  3| Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|
|  8|Dominika|Praktyczna| [SQL, Java, Python]|  3000|{level -> 0, posi...|  NULL|2023-01-30|2023-01-30 07:00:00|
|  9|     Jan|Praktyczny| [SQL, Java, Python]|  3000|{level -> 0, posi...|Active|2023-03-20|2023-03-20 11:45:00|
+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+



In [None]:
emp.filter(
    "salary between 3000 and 4000"
).show()

+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+
| id|   first|      last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+
|  1|    Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|
|  3| Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|
|  8|Dominika|Praktyczna| [SQL, Java, Python]|  3000|{level -> 0, posi...|  NULL|2023-01-30|2023-01-30 07:00:00|
|  9|     Jan|Praktyczny| [SQL, Java, Python]|  3000|{level -> 0, posi...|Active|2023-03-20|2023-03-20 11:45:00|
+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+



In [None]:
emp.filter(
    col('status').isNull()
).show()

+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+
| id|   first|      last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+
|  1|    Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|
|  3| Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|
|  8|Dominika|Praktyczna| [SQL, Java, Python]|  3000|{level -> 0, posi...|  NULL|2023-01-30|2023-01-30 07:00:00|
+---+--------+----------+--------------------+------+--------------------+------+----------+-------------------+



In [None]:
emp.filter(
    ~col('status').isNull()
).show()

+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+
| id|    first|      last|              skills|salary|                role|status| hire_date|     hire_timestamp|
+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+
|  2|      Jan|  Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-10 16:00:00|
|  4|      Ewa|    Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|2023-06-10|2023-06-01 11:00:00|
|  5|Krzysztof| Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|2023-06-15|2023-06-15 11:30:00|
|  6|      Ewa| Kierownik|[Azure, GCP, AWS,...| 12500|{level -> 2, posi...| Fired|2023-06-20|2023-06-20 12:00:00|
|  7|     Adam|  Kowalski|[Git, CI/CD, Dock...| 10500|{level -> 3, posi...|   New|2023-01-20|2023-01-20 09:00:00|
|  9|      Jan|Praktyczny| [SQL, Java, Python]|  3000|{level -> 0, posi...|Active|2023-0

## 6.4. Łączanie warunków

In [None]:
books = spark.read.csv('data/best_selling_books.csv', header=True)

In [None]:
books.limit(3).show(truncate=False)

+----------------------------------------+------------------------+-----------------+---------------+-----------------------------+------------------+
|Book                                    |Author(s)               |Original language|First published|Approximate sales in millions|Genre             |
+----------------------------------------+------------------------+-----------------+---------------+-----------------------------+------------------+
|A Tale of Two Cities                    |Charles Dickens         |English          |1859           |200                          |Historical fiction|
|The Little Prince (Le Petit Prince)     |Antoine de Saint-Exupéry|French           |1943           |200                          |Novella           |
|Harry Potter and the Philosopher's Stone|J. K. Rowling           |English          |1997           |120                          |Fantasy           |
+----------------------------------------+------------------------+-----------------+---------

In [None]:
books.filter(
    (col('First published') < 2000 )
    &
    (col('Original language') != 'English')
).limit(5).show()

+--------------------+--------------------+-----------------+---------------+-----------------------------+-------------+
|                Book|           Author(s)|Original language|First published|Approximate sales in millions|        Genre|
+--------------------+--------------------+-----------------+---------------+-----------------------------+-------------+
|The Little Prince...|Antoine de Saint-...|           French|           1943|                          200|      Novella|
|Dream of the Red ...|          Cao Xueqin|          Chinese|           1791|                          100|  Family saga|
|Vardi Wala Gunda ...|  Ved Prakash Sharma|            Hindi|           1992|                           80|    Detective|
|The Alchemist (O ...|        Paulo Coelho|       Portuguese|           1988|                           65|      Fantasy|
|One Hundred Years...|Gabriel García Má...|          Spanish|           1967|                           50|Magic realism|
+--------------------+--

In [None]:
books.filter(
    (col('Genre') == 'Detective')
    |
    (col('First published') > 2000)
).limit(5).show()


+--------------------+------------------+-----------------+---------------+-----------------------------+----------------+
|                Book|         Author(s)|Original language|First published|Approximate sales in millions|           Genre|
+--------------------+------------------+-----------------+---------------+-----------------------------+----------------+
|Vardi Wala Gunda ...|Ved Prakash Sharma|            Hindi|           1992|                           80|       Detective|
|   The Da Vinci Code|         Dan Brown|          English|           2003|                           80|Mystery thriller|
|Harry Potter and ...|     J. K. Rowling|          English|           2003|                           65|         Fantasy|
|Harry Potter and ...|     J. K. Rowling|          English|           2005|                           65|         Fantasy|
|Harry Potter and ...|     J. K. Rowling|          English|           2007|                           65|         Fantasy|
+---------------

In [None]:
books.filter(
    ( (col('Genre') == 'Detective')
    |
    (col('First published') > 2000) )
    &
    (col('Original language') == 'Hindi')
).limit(5).show()

+--------------------+------------------+-----------------+---------------+-----------------------------+---------+
|                Book|         Author(s)|Original language|First published|Approximate sales in millions|    Genre|
+--------------------+------------------+-----------------+---------------+-----------------------------+---------+
|Vardi Wala Gunda ...|Ved Prakash Sharma|            Hindi|           1992|                           80|Detective|
+--------------------+------------------+-----------------+---------------+-----------------------------+---------+



# 7. Grupowanie danych

## 7.1. Funkcje agregujące/alias

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second,
    count, min, max, avg, sum
)
from datetime import datetime

In [None]:
spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("skills", ArrayType(StringType()), False),
        StructField("salary", IntegerType(), False),
        StructField("role", MapType(StringType(), StringType()), False),
        StructField("status", StringType(), True),
        StructField("hire_date", DateType(), True),
        StructField("hire_timestamp", TimestampType(), True),
        StructField("country_code", StringType(), True)
    ]
)

emp = spark.createDataFrame(
    [
        (1, "Adam", "Nowak", ["SQL", "Java", "GCP"], 3500, {"position": "Java Developer", "level": "1"}, None,
         datetime(2023, 5, 1), datetime(2023, 5, 1, 12, 0, 0), "PL"),
        (2, "Jan", "Kowalski", ["SQL", "Java", "Azure", "Spring"], 8000, {"position": "Java Developer", "level": "3"}, "Active",
         datetime(2023, 5, 10), datetime(2023, 5, 1, 16, 0, 0), "PL"),
        (3, "Dominik", "Bajt", ["Python", "MongoDB", "Redis"], 4000, {"position": "Data Developer", "level": "1"}, None,
         datetime(2023, 5, 15), datetime(2023, 5, 15, 8, 0, 0), "GB"),
        (4, "Ewa", "Piksel", ["SQL", "Python", "Pandas", ], 4100, {"position": "Data Scientist", "level": "1"}, "Fired",
         datetime(2023, 6, 10), datetime(2023, 6, 1, 11, 0, 0), "DE"),
        (5, "Krzysztof", "Zależność", ["Git", "CI/CD", "Docker"], 8000, {"position": "DevOps", "level": "2"}, "Active",
         datetime(2023, 6, 15), datetime(2023, 6, 15, 11, 30, 0), "DE"),
        (6, "Ewa", "Kierownik", ["Azure", "GCP", "AWS", "Linux"], 12500, {"position": "Cloud Architect", "level": "2"}, "Fired",
         datetime(2023, 6, 20), datetime(2023, 6, 20, 12, 0), "CZ"),
        (7, "Adam", "Kowalski", ["Git", "CI/CD", "Docker", "Linux", "Kubernetes"], 10500, {"position": "DevOps", "level": "3"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 9, 0), "CZ"),
        (8, "Dominika", "Praktyczna", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, None,
         datetime(2023, 1, 30), datetime(2023, 1, 30, 7, 0), "GB"),
        (9, "Jan", "Praktyczny", ["SQL", "Java", "Python"], 3000, {"position": "Intern", "level": "0"}, "Active",
         datetime(2023, 3, 20), datetime(2023, 3, 20, 11, 45), "AT"),
        (10, "Mikołaj", "Sobieski", ["Python", "Django", "Flask"], 7500, {"position": "Python Developer", "level": "1"}, "New",
         datetime(2023, 1, 20), datetime(2023, 1, 20, 8, 35), "AT")
    ],
    schema
)

countries_schema = StructType(
    [
    StructField("country", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("country_code_three", StringType(), True)
    ]
)

In [None]:
countries = spark.read.csv("data/country-codes.csv", header=False, sep=";", schema=countries_schema)

countries.limit(5).show(truncate=False)

+--------------+------------+------------------+
|country       |country_code|country_code_three|
+--------------+------------+------------------+
|Afghanistan   |AF          |AFG               |
|Åland Islands |AX          |ALA               |
|Albania       |AL          |ALB               |
|Algeria       |DZ          |DZA               |
|American Samoa|AS          |ASM               |
+--------------+------------+------------------+



In [None]:
emp.limit(5).show()

+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+------------+
| id|    first|     last|              skills|salary|                role|status| hire_date|     hire_timestamp|country_code|
+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+------------+
|  1|     Adam|    Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL|
|  2|      Jan| Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|          PL|
|  3|  Dominik|     Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|          GB|
|  4|      Ewa|   Piksel|[SQL, Python, Pan...|  4100|{level -> 1, posi...| Fired|2023-06-10|2023-06-01 11:00:00|          DE|
|  5|Krzysztof|Zależność|[Git, CI/CD, Docker]|  8000|{level -> 2, posi...|Active|2023-06-15|2023-06-15 11:30:00|      

In [None]:
countries.agg(
    count("*")
).show()

+--------+
|count(1)|
+--------+
|     246|
+--------+



In [None]:
countries.agg(
    count("*").alias('liczba wierszy')
).show()

+--------------+
|liczba wierszy|
+--------------+
|           246|
+--------------+



In [None]:
emp.agg(
    sum(col('salary')).alias('wynagrodzenie').alias('sum solary'),
    max(col('salary')).alias('wynagrodzenie').alias('max solary'),
    min(col('salary')).alias('wynagrodzenie').alias('min solary'),
    avg(col('salary')).alias('wynagrodzenie').alias('avg solary')

).show()

+----------+----------+----------+----------+
|sum solary|max solary|min solary|avg solary|
+----------+----------+----------+----------+
|     64100|     12500|      3000|    6410.0|
+----------+----------+----------+----------+



## 7.2. Grupowanie danych

In [None]:
emp.groupBy(
    col('role').getItem('position')
).agg(
    avg(col('salary'))
).show()

+----------------+-----------+
|  role[position]|avg(salary)|
+----------------+-----------+
|  Java Developer|     5750.0|
|  Data Scientist|     4100.0|
|  Data Developer|     4000.0|
| Cloud Architect|    12500.0|
|          DevOps|     9250.0|
|Python Developer|     7500.0|
|          Intern|     3000.0|
+----------------+-----------+



In [None]:
emp.groupBy(
    col('role').getItem('position'),
    col('first')
).agg(
    avg(col('salary'))
).show()

+----------------+---------+-----------+
|  role[position]|    first|avg(salary)|
+----------------+---------+-----------+
|  Java Developer|      Jan|     8000.0|
|  Java Developer|     Adam|     3500.0|
|  Data Scientist|      Ewa|     4100.0|
|  Data Developer|  Dominik|     4000.0|
| Cloud Architect|      Ewa|    12500.0|
|          DevOps|Krzysztof|     8000.0|
|          DevOps|     Adam|    10500.0|
|          Intern|      Jan|     3000.0|
|Python Developer|  Mikołaj|     7500.0|
|          Intern| Dominika|     3000.0|
+----------------+---------+-----------+



In [None]:
emp.limit(3).show()

+---+-------+--------+--------------------+------+--------------------+------+----------+-------------------+------------+
| id|  first|    last|              skills|salary|                role|status| hire_date|     hire_timestamp|country_code|
+---+-------+--------+--------------------+------+--------------------+------+----------+-------------------+------------+
|  1|   Adam|   Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL|
|  2|    Jan|Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|          PL|
|  3|Dominik|    Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|          GB|
+---+-------+--------+--------------------+------+--------------------+------+----------+-------------------+------------+



In [None]:
emp.withColumn('skill', explode(col('skills'))).limit(5).show()

+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+------------+-----+
| id|first|    last|              skills|salary|                role|status| hire_date|     hire_timestamp|country_code|skill|
+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+------------+-----+
|  1| Adam|   Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL|  SQL|
|  1| Adam|   Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL| Java|
|  1| Adam|   Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL|  GCP|
|  2|  Jan|Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|          PL|  SQL|
|  2|  Jan|Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|    

In [None]:
(
    emp
    .withColumn('skill', explode(col('skills')))
    .groupBy(col('skill'))
    .agg(count('*'))
    .limit(10).show()
)

+-------+--------+
|  skill|count(1)|
+-------+--------+
|  Azure|       2|
| Spring|       1|
|    GCP|       2|
|    SQL|       5|
|   Java|       4|
|MongoDB|       1|
|  Redis|       1|
| Pandas|       1|
| Python|       5|
| Docker|       2|
+-------+--------+



## 7.3. JOIN

In [None]:
emp.show(2)
countries.show(2)

+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+------------+
| id|first|    last|              skills|salary|                role|status| hire_date|     hire_timestamp|country_code|
+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+------------+
|  1| Adam|   Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL|
|  2|  Jan|Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|          PL|
+---+-----+--------+--------------------+------+--------------------+------+----------+-------------------+------------+
only showing top 2 rows

+-------------+------------+------------------+
|      country|country_code|country_code_three|
+-------------+------------+------------------+
|  Afghanistan|          AF|               AFG|
|Åland Islands|          AX|               ALA|
+--------

In [None]:
emp_and_country = emp.join(
    countries,
    emp.country_code == countries.country_code,
    'inner'
)

In [None]:
emp_and_country.limit(5).show()

+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+------------+--------------+------------+------------------+
| id|    first|     last|              skills|salary|                role|status| hire_date|     hire_timestamp|country_code|       country|country_code|country_code_three|
+---+---------+---------+--------------------+------+--------------------+------+----------+-------------------+------------+--------------+------------+------------------+
|  1|     Adam|    Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL|        Poland|          PL|               POL|
|  2|      Jan| Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|          PL|        Poland|          PL|               POL|
|  3|  Dominik|     Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|          GB|United Kin

In [None]:
emp_and_country = emp.join(
    countries,
    emp["country_code"] == countries["country_code"],
    "inner"
)

emp_and_country.show()

+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+------------+--------------+------------+------------------+
| id|    first|      last|              skills|salary|                role|status| hire_date|     hire_timestamp|country_code|       country|country_code|country_code_three|
+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+------------+--------------+------------+------------------+
|  1|     Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|          PL|        Poland|          PL|               POL|
|  2|      Jan|  Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|          PL|        Poland|          PL|               POL|
|  3|  Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|          GB|Unit

In [None]:
emp_and_country = emp.join(
    countries,
    on='country_code',
    how='inner'
)

emp_and_country.show()

+------------+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+--------------+------------------+
|country_code| id|    first|      last|              skills|salary|                role|status| hire_date|     hire_timestamp|       country|country_code_three|
+------------+---+---------+----------+--------------------+------+--------------------+------+----------+-------------------+--------------+------------------+
|          PL|  1|     Adam|     Nowak|    [SQL, Java, GCP]|  3500|{level -> 1, posi...|  NULL|2023-05-01|2023-05-01 12:00:00|        Poland|               POL|
|          PL|  2|      Jan|  Kowalski|[SQL, Java, Azure...|  8000|{level -> 3, posi...|Active|2023-05-10|2023-05-01 16:00:00|        Poland|               POL|
|          GB|  3|  Dominik|      Bajt|[Python, MongoDB,...|  4000|{level -> 1, posi...|  NULL|2023-05-15|2023-05-15 08:00:00|United Kingdom|               GBR|
|          DE|  4|      Ewa|    Pi

## 7.4. Union/UnionAll

In [None]:
d1 = spark.read.csv("data/Games.csv", header=True).select(
    lit("Games").alias("Type"), col("Name"), col("Developer")
).orderBy(col("Developer")).limit(10)

d2 = spark.read.csv("data/best_selling_books.csv", header=True).select(
    lit("Book").alias("Type"), col("Book"), col("Author(s)")
).limit(10)

In [None]:
d1.show()
d2.show()

+-----+--------------------+--------------------+
| Type|                Name|           Developer|
+-----+--------------------+--------------------+
|Games|       Duke Nukem 3D|           3D Realms|
|Games|         Machinarium|      Amanita Design|
|Games|          Guild Wars|            ArenaNet|
|Games|        Guild Wars 2|            ArenaNet|
|Games|             Magicka|Arrowhead Game St...|
|Games|Patrician III: Ri...|             Ascaron|
|Games|              Sacred|             Ascaron|
|Games|       Baldur's Gate|             BioWare|
|Games|Baldur's Gate II:...|             BioWare|
|Games|  Neverwinter Nights|             BioWare|
+-----+--------------------+--------------------+

+----+--------------------+--------------------+
|Type|                Book|           Author(s)|
+----+--------------------+--------------------+
|Book|A Tale of Two Cities|     Charles Dickens|
|Book|The Little Prince...|Antoine de Saint-...|
|Book|Harry Potter and ...|       J. K. Rowling|
|Book

In [None]:
df3 = d1.union(d2)
df3.show()

+-----+--------------------+--------------------+
| Type|                Name|           Developer|
+-----+--------------------+--------------------+
|Games|       Duke Nukem 3D|           3D Realms|
|Games|         Machinarium|      Amanita Design|
|Games|          Guild Wars|            ArenaNet|
|Games|        Guild Wars 2|            ArenaNet|
|Games|             Magicka|Arrowhead Game St...|
|Games|Patrician III: Ri...|             Ascaron|
|Games|              Sacred|             Ascaron|
|Games|       Baldur's Gate|             BioWare|
|Games|Baldur's Gate II:...|             BioWare|
|Games|  Neverwinter Nights|             BioWare|
| Book|A Tale of Two Cities|     Charles Dickens|
| Book|The Little Prince...|Antoine de Saint-...|
| Book|Harry Potter and ...|       J. K. Rowling|
| Book|And Then There We...|     Agatha Christie|
| Book|Dream of the Red ...|          Cao Xueqin|
| Book|          The Hobbit|    J. R. R. Tolkien|
| Book|The Lion, the Wit...|         C. S. Lewis|


In [None]:
df4 = d1.unionAll(d2)
df4.show()

+-----+--------------------+--------------------+
| Type|                Name|           Developer|
+-----+--------------------+--------------------+
|Games|       Duke Nukem 3D|           3D Realms|
|Games|         Machinarium|      Amanita Design|
|Games|          Guild Wars|            ArenaNet|
|Games|        Guild Wars 2|            ArenaNet|
|Games|             Magicka|Arrowhead Game St...|
|Games|Patrician III: Ri...|             Ascaron|
|Games|              Sacred|             Ascaron|
|Games|       Baldur's Gate|             BioWare|
|Games|Baldur's Gate II:...|             BioWare|
|Games|  Neverwinter Nights|             BioWare|
| Book|A Tale of Two Cities|     Charles Dickens|
| Book|The Little Prince...|Antoine de Saint-...|
| Book|Harry Potter and ...|       J. K. Rowling|
| Book|And Then There We...|     Agatha Christie|
| Book|Dream of the Red ...|          Cao Xueqin|
| Book|          The Hobbit|    J. R. R. Tolkien|
| Book|The Lion, the Wit...|         C. S. Lewis|


# 8. Mapowanie i funkcje użytkownika

## 8.1. Funkcje użytkownika UDF

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second,
    count, min, max, avg, sum, udf, when
)
from datetime import datetime

In [None]:
spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
games = spark.read.csv("data/Games.csv", header=True).select(col("Series"), col("Developer"), col("Publisher"))

In [None]:
games.limit(5).show(truncate=False)

+---------+----------------------+----------------------+
|Series   |Developer             |Publisher             |
+---------+----------------------+----------------------+
|NULL     |PUBG Studios          |Krafton               |
|Minecraft|Mojang Studios        |Mojang Studios        |
|Diablo   |Blizzard Entertainment|Blizzard Entertainment|
|NULL     |Facepunch Studios     |Valve                 |
|NULL     |Re-Logic              |Re-Logic              |
+---------+----------------------+----------------------+



In [None]:
def to_upper(input_str: str) -> str:
    return input_str.upper()

In [None]:
upper_case_udf = udf(
    lambda x: to_upper(x),
    StringType()
)

In [None]:
games.withColumn('upper_developer', upper_case_udf(col('Developer'))).show()

+--------------------+--------------------+--------------------+--------------------+
|              Series|           Developer|           Publisher|     upper_developer|
+--------------------+--------------------+--------------------+--------------------+
|                NULL|        PUBG Studios|             Krafton|        PUBG STUDIOS|
|           Minecraft|      Mojang Studios|      Mojang Studios|      MOJANG STUDIOS|
|              Diablo|Blizzard Entertai...|Blizzard Entertai...|BLIZZARD ENTERTAI...|
|                NULL|   Facepunch Studios|               Valve|   FACEPUNCH STUDIOS|
|                NULL|            Re-Logic|            Re-Logic|            RE-LOGIC|
|            Warcraft|Blizzard Entertai...|Blizzard Entertai...|BLIZZARD ENTERTAI...|
|           Half-Life|               Valve|     Valve (digital)|               VALVE|
|         The Witcher|      CD Projekt Red|          CD Projekt|      CD PROJEKT RED|
|           StarCraft|Blizzard Entertai...|Blizzard En

## 8.2. Funkcja when

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType
from pyspark.sql.functions import (
    col, size, lit, explode,
    concat, concat_ws, substring,
    datediff, date_add, date_sub,
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear,
    hour, minute, second,
    count, min, max, avg, sum, udf, when
)
from datetime import datetime

In [None]:
spark = SparkSession.builder.appName("spark").getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("salary", IntegerType(), False),
    ]
)

emp = spark.createDataFrame(
    [
        ("Adam", "Nowak", 3500),
        ("Jan", "Kowalski", 8000),
        ("Dominik", "Bajt", 4000),
        ("Ewa", "Piksel", 4100),
        ("Krzysztof", "Zależność", 8000),
        ("Ewa", "Kierownik", 12500),
        ("Adam", "Kowalski", 10500),
        ("Dominika", "Praktyczna", 3000),
        ("Jan", "Praktyczny", 3000),
        ("Mikołaj", "Sobieski", 7500)
    ],
    schema
)

In [None]:
emp.show()

+---------+----------+------+
|    first|      last|salary|
+---------+----------+------+
|     Adam|     Nowak|  3500|
|      Jan|  Kowalski|  8000|
|  Dominik|      Bajt|  4000|
|      Ewa|    Piksel|  4100|
|Krzysztof| Zależność|  8000|
|      Ewa| Kierownik| 12500|
|     Adam|  Kowalski| 10500|
| Dominika|Praktyczna|  3000|
|      Jan|Praktyczny|  3000|
|  Mikołaj|  Sobieski|  7500|
+---------+----------+------+



In [None]:
(
    emp.withColumn('salary_tier',
                   when(col('salary') >= 10000, 'I Tier')
                   .when(col('salary') >= 8000, 'II Tier')
                   .otherwise('III Tier')
                  )
).show()

+---------+----------+------+-----------+
|    first|      last|salary|salary_tier|
+---------+----------+------+-----------+
|     Adam|     Nowak|  3500|   III Tier|
|      Jan|  Kowalski|  8000|    II Tier|
|  Dominik|      Bajt|  4000|   III Tier|
|      Ewa|    Piksel|  4100|   III Tier|
|Krzysztof| Zależność|  8000|    II Tier|
|      Ewa| Kierownik| 12500|     I Tier|
|     Adam|  Kowalski| 10500|     I Tier|
| Dominika|Praktyczna|  3000|   III Tier|
|      Jan|Praktyczny|  3000|   III Tier|
|  Mikołaj|  Sobieski|  7500|   III Tier|
+---------+----------+------+-----------+



## 8.3. Funkcja map

In [None]:
emp.show()

+---------+----------+------+
|    first|      last|salary|
+---------+----------+------+
|     Adam|     Nowak|  3500|
|      Jan|  Kowalski|  8000|
|  Dominik|      Bajt|  4000|
|      Ewa|    Piksel|  4100|
|Krzysztof| Zależność|  8000|
|      Ewa| Kierownik| 12500|
|     Adam|  Kowalski| 10500|
| Dominika|Praktyczna|  3000|
|      Jan|Praktyczny|  3000|
|  Mikołaj|  Sobieski|  7500|
+---------+----------+------+



In [None]:
new_schema = StructType(
    [
        StructField('body', MapType(StringType(), StringType()), False)
    ]
)

In [None]:
def map_to_json(r):
    return Row(
        body={
            'first': r.first,
            'last': r.last,
            'year_salary': str(r.salary * 12)
        }
    )

In [None]:
body_emp = emp.rdd.map(lambda r: map_to_json(r)).toDF(new_schema)
body_emp.show(truncate=False)

+-------------------------------------------------------------+
|body                                                         |
+-------------------------------------------------------------+
|{year_salary -> 42000, last -> Nowak, first -> Adam}         |
|{year_salary -> 96000, last -> Kowalski, first -> Jan}       |
|{year_salary -> 48000, last -> Bajt, first -> Dominik}       |
|{year_salary -> 49200, last -> Piksel, first -> Ewa}         |
|{year_salary -> 96000, last -> Zależność, first -> Krzysztof}|
|{year_salary -> 150000, last -> Kierownik, first -> Ewa}     |
|{year_salary -> 126000, last -> Kowalski, first -> Adam}     |
|{year_salary -> 36000, last -> Praktyczna, first -> Dominika}|
|{year_salary -> 36000, last -> Praktyczny, first -> Jan}     |
|{year_salary -> 90000, last -> Sobieski, first -> Mikołaj}   |
+-------------------------------------------------------------+



## 8.4. Funkcja flatMap

In [None]:
def map_to_json_with_currency(r):
    return [
        Row(body={ 'first': r.first, 'last': r.last, 'year_salary': str(r.salary * 12), 'currency': 'PLN' }),
        Row(body={ 'first': r.first, 'last': r.last, 'year_salary': str(r.salary * 12 / 4.35), 'currency': 'EUR' })
    ]

In [None]:
body_emp = emp.rdd.flatMap(lambda r: map_to_json_with_currency(r)).toDF(new_schema)
body_emp.show(truncate=False)

+------------------------------------------------------------------------------------------+
|body                                                                                      |
+------------------------------------------------------------------------------------------+
|{year_salary -> 42000, currency -> PLN, last -> Nowak, first -> Adam}                     |
|{year_salary -> 9655.172413793105, currency -> EUR, last -> Nowak, first -> Adam}         |
|{year_salary -> 96000, currency -> PLN, last -> Kowalski, first -> Jan}                   |
|{year_salary -> 22068.96551724138, currency -> EUR, last -> Kowalski, first -> Jan}       |
|{year_salary -> 48000, currency -> PLN, last -> Bajt, first -> Dominik}                   |
|{year_salary -> 11034.48275862069, currency -> EUR, last -> Bajt, first -> Dominik}       |
|{year_salary -> 49200, currency -> PLN, last -> Piksel, first -> Ewa}                     |
|{year_salary -> 11310.344827586208, currency -> EUR, last -> Piksel, 

# 9. Zapisywanie danych do pliku

## 9.1. Omówienie formatów danych

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DateType, TimestampType

In [None]:
spark = SparkSession.builder.appName("spark").getOrCreate()

#### Formaty danych (najpopularniejsze dla PySpark)

1. csv - czytelny dla użytkownika, prosty w implementacji, zapis w postaci łańcuchów znakowych
2. json - zapis klucz:wartość, popularny format JSON,
3. parquet - format kolumnowy, "write once, read many", zapis binarny, przechowuje również schemat danych
4. avro - format o wysokiej kompresji, dobry do archiwizacji danych, zapis binarny, przechowuje schemat

## 9.2. Zapis do pliku

In [None]:
schema = StructType(
    [
        StructField("first", StringType(), False),
        StructField("last", StringType(), False),
        StructField("salary", IntegerType(), False),
    ]
)

emp = spark.createDataFrame(
    [
        ("Adam", "Nowak", 3500),
        ("Jan", "Kowalski", 8000),
        ("Dominik", "Bajt", 4000),
        ("Ewa", "Piksel", 4100),
        ("Krzysztof", "Zależność", 8000),
        ("Ewa", "Kierownik", 12500),
        ("Adam", "Kowalski", 10500),
        ("Dominika", "Praktyczna", 3000),
        ("Jan", "Praktyczny", 3000),
        ("Mikołaj", "Sobieski", 7500)
    ],
    schema
)

In [None]:
emp.write.mode("overwrite").parquet("data/emp.parquet")

In [None]:
spark.read.csv('data/import', header=True).show()

+----+----+
|col1|col2|
+----+----+
|   1|   1|
|   2|   2|
|   3|   3|
|   4|   4|
|   4|   4|
|   5|   5|
|   6|   6|
|   7|   7|
+----+----+



## 9.3. Spark SQL

In [None]:
emp.createOrReplaceTempView('widok')

In [None]:
spark.sql(
    "SELECT first, count(*) as cnt FROM widok GROUP BY first order by cnt desc"
).show()

+---------+---+
|    first|cnt|
+---------+---+
|     Adam|  2|
|      Jan|  2|
|      Ewa|  2|
|  Dominik|  1|
|Krzysztof|  1|
|  Mikołaj|  1|
| Dominika|  1|
+---------+---+



In [None]:
spark.sql(
    "SELECT * FROM widok"
).show()

+---------+----------+------+
|    first|      last|salary|
+---------+----------+------+
|     Adam|     Nowak|  3500|
|      Jan|  Kowalski|  8000|
|  Dominik|      Bajt|  4000|
|      Ewa|    Piksel|  4100|
|Krzysztof| Zależność|  8000|
|      Ewa| Kierownik| 12500|
|     Adam|  Kowalski| 10500|
| Dominika|Praktyczna|  3000|
|      Jan|Praktyczny|  3000|
|  Mikołaj|  Sobieski|  7500|
+---------+----------+------+

