# 0. Подключение библиотеки pyspark

In [49]:
import os
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import SparkSession

# 1. Создание SparkSession и SparkContext

In [2]:
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

# 2. Конфигурирование доступа к хранилищу S3

Укажите параметры доступа к своему хранилищу S3: Endpoint, Acess key, Secret key

In [None]:
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "https://your_endpoint_name")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "your_access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "your_secret_key")

# 3. Загрузка датасета

Определим схему данных датасета, и создадим Spark Dataframe, посредством которого мы будем работать с данными:

In [3]:
schema = T.StructType([
    T.StructField('num', T.IntegerType(), True),
    T.StructField('sensor_id', T.IntegerType(), True),
    T.StructField('location', T.IntegerType(), True),
    T.StructField('lat', T.DoubleType(), True),
    T.StructField('lon', T.DoubleType(), True),
    T.StructField('timestamp', T.TimestampType(), True),
    T.StructField('pressure', T.DoubleType(), True),
    T.StructField('temperature', T.DoubleType(), True),
    T.StructField('humidity', T.DoubleType(), True)
])

В качестве данных используем CSV-файл объемом 7.8 GB, собранный из данных, расположенных на https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset. Данные содержат записи с датчиков погоды.

Укажите путь до датасета на вашем бакете S3:

In [8]:
path = 's3a://your_bucket_name/path/dataset.csv'

df = spark \
    .read \
    .format('csv') \
    .options(header='true') \
    .schema(schema) \
    .load(path)
    
df = df.drop('num').withColumn('hour', F.hour(F.col('timestamp')))

In [10]:
df.printSchema()

root
 |-- sensor_id: integer (nullable = true)
 |-- location: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- pressure: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- hour: integer (nullable = true)



Темерь мы можем увидеть Spark Dataset:

In [17]:
df.show(10)

+---------+--------+------------------+------------------+-------------------+--------+-----------+--------+----+
|sensor_id|location|               lat|               lon|          timestamp|pressure|temperature|humidity|hour|
+---------+--------+------------------+------------------+-------------------+--------+-----------+--------+----+
|     2266|    1140|            42.738|            23.272|2017-07-01 00:00:07|95270.27|      23.46|   62.48|   0|
|     2292|    1154|42.663000000000004|23.273000000000003|2017-07-01 00:00:08|94355.83|      23.06|   59.46|   0|
|     3096|    1558|              42.7|             23.36|2017-07-01 00:00:10|95155.81|      26.53|   44.38|   0|
|     3428|    1727|42.623999999999995|            23.406|2017-07-01 00:00:12|94679.57|      28.34|   38.28|   0|
|     3472|    1750|            42.669|            23.318|2017-07-01 00:00:13|94327.88|      26.31|   46.37|   0|
|     1952|     976|42.708999999999996|23.398000000000003|2017-07-01 00:00:13|95314.52| 

Посчитаем количество строк до препроцессинга (это может занять время):

In [18]:
df.count()

97288452

# 4. Препроцессинг данных

Если мы хотим использовать SQL-синтаксис для запросов Spark, мы должны зарегистрировать временное представление для данных (ограниченное в рамках вашей сессии Spark). После этого мы сможем обращаться к нему по имени:

In [19]:
df.createOrReplaceTempView('weather')

Команда ниже запускает типичное задание Spark и собирает результаты на Spark Driver. Запрос выбирает данные за дневные периоы, группирует по расположению, и подсчитывает некоторые статистики для каждого расположения (это может занять время):

In [20]:
result = spark.sql('''
    select 
        location as location_id, 
        count(1) as data_num, 
        avg(pressure) as mean_pressure,
        avg(humidity) as mean_humidity,
        max(temperature) as max_temp
    from weather 
    where hour > 9 and hour < 20 
    group by location
''').collect()

In [21]:
print(len(result))

485


Препроцессинг был успешно осуществлён, размер датасета уменьшился с 97 288 452 до 485 строк. Теперь мы можем, к примеру, загрузить данные в датафрейм Pandas, чтобы продолжить работу с ними на Spark Driver или в любом другом расположении:

In [27]:
import pandas as pd

pd.DataFrame.from_records(map(lambda x: x.asDict(), result))

Unnamed: 0,location_id,data_num,mean_pressure,mean_humidity,max_temp
0,1025,175218,95410.270136,60.886515,41.91
1,2580,97453,93361.977317,71.473486,31.33
2,12006,29580,94663.426777,48.635029,47.89
3,1139,158230,94872.799095,51.396233,43.61
4,3488,141273,93170.706796,61.865252,38.41
...,...,...,...,...,...
480,3050,107306,89184.191561,63.792659,36.01
481,3245,123686,94246.646024,65.139249,36.09
482,3376,143159,94642.903072,50.533826,36.56
483,5836,26272,95171.041646,52.475399,33.29
