In [5]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [8]:
df = spark.read.json('people.json')

In [10]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [13]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [19]:
df.columns

['age', 'name']

In [21]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



RĘCZNE USTAWIANIE SCHEMATU RAMKI DANYCH

Schema - w celu wykonania pewnych operacji na zbiorze danych schema musi być poprawna czyli musi wiedzieć jakie są typy kolumn, czy są ciągami znaków czy liczbami całkowitymi etc., czy są możliwe NULLe

In [24]:
from pyspark.sql.types import (StructField,StringType,
                               IntegerType,StructType)

In [26]:
data_schema = [StructField('age', IntegerType(), True),
               StructField('name', StringType(), True)]

True jest po to, żeby zakomunikować, że pole age/name może mieć wartość NULL

In [27]:
final_struc = StructType(fields=data_schema)

In [29]:
df = spark.read.json('people.json', schema=final_struc)

In [30]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [31]:
type(df['age']) # zwraca obiekt kolumnowy

pyspark.sql.column.Column

In [32]:
df.select('age') # zwraca ramkę danych, która zawiera pojedyńczą kolumnę

DataFrame[age: int]

In [33]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [36]:
df.head(2)[0] # wzięcie dwóch pierwszych wierszy z indeksacją na 0

Row(age=None, name='Michael')

In [38]:
type(df.head(2)[0]) # zwraca obiet wiersza w ramce danych

pyspark.sql.types.Row

In [40]:
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [45]:
df.withColumn('double_age', df['age']*2).show() # tworzenie nowej kolumny w ramce na podstawie innej kolumny

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [47]:
df.withColumnRenamed('age', 'my_new_age').show() # zmiana nazwy kolumny w ramce

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



TWORZENIE TYMCZASOWEGO WIDOKU I PRACA NA NIM

In [48]:
df.createOrReplaceTempView('people')

In [49]:
results = spark.sql("SELECT * FROM people")

In [55]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [57]:
new_result = spark.sql("SELECT * FROM people WHERE age=30")

In [59]:
new_result.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

