In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=da0e40ea5c8693b86872041107c7b59d76a9fe72fe9127510abbfc92eae401ed
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
import random
import pyspark

In [4]:
sc = pyspark.SparkContext()

In [5]:
def inside(p):
    x, y = random.random(), random.random()
    return x * x + y * y < 1

In [6]:
NUM_SAMPLES = 10 ** 7
count = sc.parallelize(range(NUM_SAMPLES)).filter(inside).count()
approx_pi = (4.0 * count / NUM_SAMPLES)
print(f"Pi is roughly {approx_pi}")

Pi is roughly 3.1419012


In [7]:
!pip install faker

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting faker
  Downloading Faker-18.3.4-py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m51.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: faker
Successfully installed faker-18.3.4


In [8]:
import pandas as pd
from random import randint

from pyspark.sql import Row
from pyspark.sql import SparkSession

from datetime import datetime, date
from faker import Faker

In [9]:
spark_session = SparkSession.builder.getOrCreate()

In [10]:
fake = Faker()


In [11]:
users_df = spark_session.createDataFrame([
    Row(
        first_name=fake.first_name(),
        last_name=fake.last_name(),
        date_of_birth=fake.date_of_birth(),
        address=fake.address(),
        salary=randint(2000, 15000),
        age=randint(25, 60),
        city=fake.city()
    )
    for _ in range(10)
])

In [12]:
users_df.show()

+----------+---------+-------------+--------------------+------+---+-----------------+
|first_name|last_name|date_of_birth|             address|salary|age|             city|
+----------+---------+-------------+--------------------+------+---+-----------------+
|      Chad|      Fry|   1976-03-30|146 Fischer Valle...|  5559| 42|    Catherineside|
|     Tracy|   Wright|   1950-11-11|83562 Erik Branch...| 11044| 35|     Abigailville|
|     David|   Hodges|   1920-10-18|71949 Lawson Prai...| 13178| 38|      Jeffreyfort|
|    Rodney|     Cole|   1907-11-06|07936 Kristina Pa...| 13529| 57|     Matthewmouth|
|     Edwin|    Baker|   1983-05-11|309 Johnson Skywa...| 13603| 56|     Stephenmouth|
|   Gabriel|  Burgess|   1995-01-15|0092 Jessica Ranc...| 14887| 50| East Jeffreytown|
|   Michael|   Fowler|   1941-06-16|PSC 3513, Box 062...| 13664| 37|      Andrewville|
|   Jessica|  Johnson|   1911-02-06|Unit 7237 Box 048...|  9578| 56|       North Beth|
|      Lisa|    Bates|   1999-02-19|087 Jas

вычислим средний возраст пользователей и его среднеквадратическое отклонение

In [13]:
from pyspark.sql.functions import mean, stddev, round as _round

In [14]:
users_df.select(
    mean("age").alias("average age"),
    _round(stddev("age"), 2).alias("age stddev")
).show()

+-----------+----------+
|average age|age stddev|
+-----------+----------+
|       46.0|      9.45|
+-----------+----------+



коэффициент корреляции между зарплатой и возрастом

In [15]:
users_df.corr("age", "salary")

0.128056026266976

средняя зарплата по городам

In [16]:
users_df.groupBy("city").avg("age").show()

+-----------------+--------+
|             city|avg(age)|
+-----------------+--------+
|     Stephenmouth|    56.0|
|      Jeffreyfort|    38.0|
|    Catherineside|    42.0|
|     Abigailville|    35.0|
|     Matthewmouth|    57.0|
|       North Beth|    56.0|
|  North Francisco|    54.0|
| East Jeffreytown|    50.0|
|      Andrewville|    37.0|
|South Deborahbury|    35.0|
+-----------------+--------+



средний возраст по городам

In [17]:
users_df.groupBy("city").avg("salary").show()

+-----------------+-----------+
|             city|avg(salary)|
+-----------------+-----------+
|     Stephenmouth|    13603.0|
|      Jeffreyfort|    13178.0|
|    Catherineside|     5559.0|
|     Abigailville|    11044.0|
|     Matthewmouth|    13529.0|
|       North Beth|     9578.0|
|  North Francisco|     3388.0|
| East Jeffreytown|    14887.0|
|      Andrewville|    13664.0|
|South Deborahbury|     4366.0|
+-----------------+-----------+



если мы хотим одновременно усреднить возраст и зарплату по городам, то достаточно выполнить следующий код:

In [18]:
users_df.groupBy("city").avg().show()

+-----------------+-----------+--------+
|             city|avg(salary)|avg(age)|
+-----------------+-----------+--------+
|     Stephenmouth|    13603.0|    56.0|
|      Jeffreyfort|    13178.0|    38.0|
|    Catherineside|     5559.0|    42.0|
|     Abigailville|    11044.0|    35.0|
|     Matthewmouth|    13529.0|    57.0|
|       North Beth|     9578.0|    56.0|
|  North Francisco|     3388.0|    54.0|
| East Jeffreytown|    14887.0|    50.0|
|      Andrewville|    13664.0|    37.0|
|South Deborahbury|     4366.0|    35.0|
+-----------------+-----------+--------+



Давайте попробуем вычислить с помощью этого метода средний возраст пользователей, находящихся в датафрейме

In [19]:
users_df.agg({"age": "avg"}).show()

+--------+
|avg(age)|
+--------+
|    46.0|
+--------+



Метод agg можно использовать в связке с методом groupBy. Например среднюю зарплату по городам можно было вычислить еще и так

In [20]:
users_df.groupBy("city").agg({"age": "avg"}).show()

+-----------------+--------+
|             city|avg(age)|
+-----------------+--------+
|     Stephenmouth|    56.0|
|      Jeffreyfort|    38.0|
|    Catherineside|    42.0|
|     Abigailville|    35.0|
|     Matthewmouth|    57.0|
|       North Beth|    56.0|
|  North Francisco|    54.0|
| East Jeffreytown|    50.0|
|      Andrewville|    37.0|
|South Deborahbury|    35.0|
+-----------------+--------+



Предположим, что зарплата наших пользователей указана без учета вычета налогов. Напишем и применим пользовательскую функцию, которая пересчитает зарплату с учетом налоговых сборов. Для этого необходимо просто написать функцию и применить к ней декоратор udf, находящийся в pyspark.sql.functions.

In [21]:
from pyspark.sql.functions import udf

@udf('float')
def amount_net(amount_gross: float) -> float:
    return amount_gross * 0.19

In [22]:
users_df.select(amount_net(users_df.salary).alias("salary_net")).show()

+----------+
|salary_net|
+----------+
|   1056.21|
|   2098.36|
|   2503.82|
|   2570.51|
|   2584.57|
|   2828.53|
|   2596.16|
|   1819.82|
|    829.54|
|    643.72|
+----------+



Иногда имея PySpark датафрейм нам все же хочется превратить его в pandas датафрейм. Подобное желание, вполне, осуществимо благодаря методу toPandas.

In [23]:
users_pd_df = users_df.toPandas()

In [24]:
users_pd_df

Unnamed: 0,first_name,last_name,date_of_birth,address,salary,age,city
0,Chad,Fry,1976-03-30,"146 Fischer Valley Suite 823\nAustinborough, S...",5559,42,Catherineside
1,Tracy,Wright,1950-11-11,"83562 Erik Branch\nSmithmouth, NY 49099",11044,35,Abigailville
2,David,Hodges,1920-10-18,"71949 Lawson Prairie Apt. 147\nVargaschester, ...",13178,38,Jeffreyfort
3,Rodney,Cole,1907-11-06,07936 Kristina Passage Apt. 267\nSouth Shannon...,13529,57,Matthewmouth
4,Edwin,Baker,1983-05-11,"309 Johnson Skyway Apt. 733\nCarolynberg, CO 6...",13603,56,Stephenmouth
5,Gabriel,Burgess,1995-01-15,"0092 Jessica Ranch Apt. 798\nBrownhaven, NC 02078",14887,50,East Jeffreytown
6,Michael,Fowler,1941-06-16,"PSC 3513, Box 0628\nAPO AE 41748",13664,37,Andrewville
7,Jessica,Johnson,1911-02-06,Unit 7237 Box 0482\nDPO AE 01640,9578,56,North Beth
8,Lisa,Bates,1999-02-19,"087 Jason Walk Suite 014\nLake Jeffery, WV 74137",4366,35,South Deborahbury
9,Ana,Cardenas,1989-03-10,"566 Tony Locks Apt. 925\nAshleystad, NH 31193",3388,54,North Francisco


**SQL в spark**

ля того, чтобы создать представление для датафрейма с пользователями нужно выполнить следующий код

In [25]:
users_df.createOrReplaceTempView("Users")

Теперь объекту сессии spark_session через метод sql достаточно передать SQL запрос в котором мы обращаемся к представлению Users, созданному выше.

In [26]:
users_df.createOrReplaceTempView("Users")
spark_session.sql("Select city, avg(age) From Users Group By city").show()

+-----------------+--------+
|             city|avg(age)|
+-----------------+--------+
|     Stephenmouth|    56.0|
|      Jeffreyfort|    38.0|
|    Catherineside|    42.0|
|     Abigailville|    35.0|
|     Matthewmouth|    57.0|
|       North Beth|    56.0|
|  North Francisco|    54.0|
| East Jeffreytown|    50.0|
|      Andrewville|    37.0|
|South Deborahbury|    35.0|
+-----------------+--------+



**Сохранение данных**

Мы же с вами давайте сохраним в текстовом файле результаты предыдущего запроса

In [27]:
users_df.createOrReplaceTempView("Users")
result = spark_session.sql("Select city, avg(age) From Users Group By city")
result.rdd.saveAsTextFile("results.txt")