# Аналіз даних у PySpark — результати

## Імпорт бібліотек

In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, sum as _sum, round, lit
from pyspark.sql.types import LongType, IntegerType, DoubleType
from pyspark.sql.functions import sum as spark_sum

## Дані

In [2]:
FILE_PATH = '../data/nuek-vuh3.csv'

## Частина 1

### Запускаємо сесію та Jobs

In [3]:
if 'spark' in locals():
    try:
        spark.stop()
    except Exception:
        pass

spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "2")
    .appName("MyGoitSparkSandbox")
    .getOrCreate())

nuek_df = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(FILE_PATH))

nuek_repart = nuek_df.repartition(2)

nuek_processed = (nuek_repart
    .where("final_priority < 3")
    .select("unit_id", "final_priority")
    .groupBy("unit_id")
    .count())

nuek_processed = nuek_processed.where("count>2")

nuek_processed.collect()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/14 20:45:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[Row(unit_id='83', count=4)]

### Дивимось статистику [http://localhost:4040/jobs/](http://localhost:4040/jobs/)

### Зупиняємо сесію

In [4]:
nuek_repart.show()
spark.stop()

25/08/14 20:45:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----------+-------+---------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+----------------------+-------------------+--------------------+-------------+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+---------------------------------+-------------+--------------------+-------------------+-------------------+---------------------------+
|call_number|unit_id|incident_number|       call_type|          call_date|         watch_date|      received_dttm|         entry_dttm|      dispatch_dttm|      response_dttm|      on_scene_dttm|transport_dttm|hospital_dttm|call_final_disposition|     available_dttm|             address|         city|zipcode_of_incident|battalion|station_area

## Частина 2

### Запускаємо сесію та Jobs

In [5]:
if 'spark' in locals():
    try:
        spark.stop()
    except Exception:
        pass

spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "2")
    .appName("MyGoitSparkSandbox")
    .getOrCreate())

nuek_df = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(FILE_PATH))

nuek_repart = nuek_df.repartition(2)

nuek_processed = (nuek_repart
    .where("final_priority < 3")
    .select("unit_id", "final_priority")
    .groupBy("unit_id")
    .count())

nuek_processed.collect()

nuek_processed = nuek_processed.where("count>2")

nuek_processed.collect()

[Row(unit_id='83', count=4)]

### Дивимось статистику [http://localhost:4040/jobs/](http://localhost:4040/jobs/)

### Зупиняємо сесію

In [6]:
nuek_repart.show()
spark.stop()

+-----------+-------+---------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+----------------------+-------------------+--------------------+-------------+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+---------------------------------+-------------+--------------------+-------------------+-------------------+---------------------------+
|call_number|unit_id|incident_number|       call_type|          call_date|         watch_date|      received_dttm|         entry_dttm|      dispatch_dttm|      response_dttm|      on_scene_dttm|transport_dttm|hospital_dttm|call_final_disposition|     available_dttm|             address|         city|zipcode_of_incident|battalion|station_area

## Частина 3

### Запускаємо сесію та Jobs

In [7]:
if 'spark' in locals():
    try:
        spark.stop()
    except Exception:
        pass

spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "2")
    .appName("MyGoitSparkSandbox")
    .getOrCreate())

nuek_df = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(FILE_PATH))

nuek_repart = nuek_df.repartition(2)

nuek_processed_cached = (nuek_repart
    .where("final_priority < 3")
    .select("unit_id", "final_priority")
    .groupBy("unit_id")
    .count()
    .cache())

nuek_processed_cached.collect()

nuek_processed = nuek_processed_cached.where("count>2")

nuek_processed.collect()

[Row(unit_id='83', count=4)]

### Дивимось статистику [http://localhost:4040/jobs/](http://localhost:4040/jobs/)

### Зупиняємо сесію

In [8]:
nuek_processed_cached.unpersist()
nuek_repart.show()
spark.stop()

+-----------+-------+---------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+----------------------+-------------------+--------------------+-------------+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+---------------------------------+-------------+--------------------+-------------------+-------------------+---------------------------+
|call_number|unit_id|incident_number|       call_type|          call_date|         watch_date|      received_dttm|         entry_dttm|      dispatch_dttm|      response_dttm|      on_scene_dttm|transport_dttm|hospital_dttm|call_final_disposition|     available_dttm|             address|         city|zipcode_of_incident|battalion|station_area