# Reading Writing and Validating Data in PySpark

In [1]:
# Сначала создадим наш экземпляр PySpark!

# Пользователи ПК могут использовать следующие две строки кода, но пользователям Mac это не нужно
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

# Может занять некоторое время на месте
spark = SparkSession.builder.appName("ReadWriteVal").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Reading data

DataFrame эквивалентен реляционной таблице в Spark SQL и может быть создан с использованием различных функций в SparkSession.

Сначала давайте попробуем прочитать файл csv, содержащий список учеников и их оценки.

**Source:** https://www.kaggle.com/spscientist/students-performance-in-exams

In [3]:
# Начнем с чтения базового набора данных csv
# Сообщите Spark о заголовке и определите типы схемы!

path ="Datasets/"

# Некоторые данные csv
students = spark.read.csv(path+'students.csv',inferSchema=True,header=True)

**Parquet Files**

Теперь попробуйте прочитать паркетный файл. Это наиболее распространенный тип данных в мире больших данных. Почему? потому что это самый компактный способ хранения файлов (даже лучше, чем заархивированные файлы!)

In [4]:
parquet = spark.read.parquet(path+'users1.parquet')
parquet.show(2)

+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 10:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 20:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



**Разделенные паркетные файлы**

Фактически, большинство больших наборов данных будут разделены. Вот как можете собрать все части (части) набора данных одной простой командой.

In [5]:
partitioned = spark.read.parquet(path+'users*')
partitioned.show(2)

+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 10:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 20:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



Вы также можете выбрать чтение только определенного набора паркетных файлов. Скажем, например, что вам нужны только пользователи1 и пользователи2, а не пользователи3

In [6]:
# Обратите внимание, что параметр .option ("basePath", path) используется для отмены автоматической функции
# что исключит секционированную переменную в результирующем фрейме данных.
# Я предпочитаю иметь информацию о разделах в моем новом фрейме данных лично.
users1_2 = spark.read.option("basePath", path).parquet(path+'users1.parquet', path+'users2.parquet')
users1_2.show()

+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 10:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
|2016-02-03 20:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male| 218.111.175.34|                   |              Canada| 1/16/1968|150280.17|       Accountant IV|                    |
|2016-02-03 04:09:31|  3|

## Проверка данных

Затем нужно будет убедиться, что фрейм данных был прочитан правильно. Позже перейдем к более детальной оценке данных, но сначала нам нужно убедиться, что все типы переменных были введены правильно и что значения действительно были введены ... иногда это не так :)

In [7]:
# Получите первоначальное представление о вашем фрейме данных
students.show(3)

+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|   lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|standard|                   none|        72|           72|           74|
|female|       group C|               some college|standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|standard|                   none|        90|           95|           93|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
only showing top 3 rows



In [8]:
# Если ваш фрейм данных - это больше, чем просто несколько переменных, этот метод намного лучше
students.limit(5).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44
4,male,group C,some college,standard,none,76,78,75


In [9]:
# Обратите внимание на типы здесь:
print(type(students))
studentsPdf = students.toPandas()
print(type(studentsPdf))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


In [10]:
# Резюме ваших данных:

#show the data (like df.head())
print(students.printSchema())
print("")
print(students.columns)
print("")
print(students.describe()) # Not so fond of this one but to each their own

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)

None

['gender', 'race/ethnicity', 'parental level of education', 'lunch', 'test preparation course', 'math score', 'reading score', 'writing score']

DataFrame[summary: string, gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: string, reading score: string, writing score: string]


In [11]:
# Если нужно получить тип только ОДНОГО столбца по имени, вы можете использовать эту функцию:
students.schema['math score'].dataType

IntegerType

In [12]:
# Аккуратная функция "описания"
students.describe(['math score']).show()

+-------+------------------+
|summary|        math score|
+-------+------------------+
|  count|              1000|
|   mean|            66.089|
| stddev|15.163080096009454|
|    min|                 0|
|    max|               100|
+-------+------------------+



In [13]:
# Сводная функция
students.select("math score", "reading score","writing score").summary("count", "min", "25%", "75%", "max").show()

+-------+----------+-------------+-------------+
|summary|math score|reading score|writing score|
+-------+----------+-------------+-------------+
|  count|      1000|         1000|         1000|
|    min|         0|           17|           10|
|    25%|        57|           59|           57|
|    75%|        77|           79|           79|
|    max|       100|          100|          100|
+-------+----------+-------------+-------------+



## Как указать типы данных при чтении в наборах данных.

Некоторые типы данных упрощают вывод схемы (например, табличные форматы, такие как csv, которые покажем позже).

Однако вам часто приходится устанавливать схему самостоятельно, если вы не имеете дело с методом .read, который не имеет встроенного inferSchema ().

В Spark есть все необходимые для этого инструменты, просто для этого требуется очень специфическая структура:

In [14]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType,DateType

Далее нам нужно создать список полей структуры 
    * : имя параметра: строка, имя поля. 
    * : param dataType:: class: DataType поля. 
    * : param nullable: boolean, может ли поле быть нулевым (None) или нет.

In [15]:
data_schema = [StructField("name", StringType(), True),
               StructField("email", StringType(), True),
               StructField("city", StringType(), True),
               StructField("mac", StringType(), True),
               StructField("timestamp", DateType(), True),
               StructField("creditcard", StringType(), True)]

In [16]:
final_struc = StructType(fields=data_schema)

На этот раз мы сделаем файл .json :)

In [17]:
people = spark.read.json(path+'people.json', schema=final_struc)

In [18]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



In [19]:
people.limit(4).toPandas()

Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,


## Запись данных


In [25]:
values = [('Pear',10),('Orange',36),('Banana',123),('Kiwi',48),('Peach',16),('Strawberry',1)]
df = spark.createDataFrame(values,['fruit','quantity'])
df.show()

+----------+--------+
|     fruit|quantity|
+----------+--------+
|      Pear|      10|
|    Orange|      36|
|    Banana|     123|
|      Kiwi|      48|
|     Peach|      16|
|Strawberry|       1|
+----------+--------+

