In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, col
from pyspark.sql.types import *
from os.path import abspath

spark = SparkSession\
    .builder\
    .appName("pyspark-notebook")\
    .master("spark://spark-master:7077")\
    .config("spark.executor.memory", "512m")\
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
    .enableHiveSupport()\
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


In [21]:
ab_nyc_data = spark.read.csv(
    path="datasets/AB_NYC_2019.csv", sep=",", header=True)


In [22]:
n = ab_nyc_data.count()


                                                                                

49079

In [23]:
ab_nyc_data.write.saveAsTable('ab_nyc_2019')


                                                                                

In [58]:
result = spark.sql(
    "SELECT count(price), avg(price) as mean, variance (price) as variance FROM ab_nyc_2019;")
sql_count, sql_mean, sql_variance = result.first()


In [92]:
from functools import reduce


def mapper(data):
    num = 0
    try:
        price = int(data)
        num = 1
    except (ValueError, TypeError):
        price = 0
    return [num, price, 0]  # номер строки, цена, дисперсия


def reducer(data_prev, data_current):
    if data_current[0] != 0:
        n = data_prev[0] + data_current[0]
        delta = data_current[1] - data_prev[1]
        mean = data_prev[1] + delta/n
        variance = data_prev[2] + (delta * (data_current[1] - data_prev[1]))
        return [n, mean, variance]
    else:
        return data_prev


count, mean, variance = reduce(reducer, map(
    mapper, [row.price for row in ab_nyc_data.select('price').collect()]))
variance = variance**0.5

print(
    f'Рассчеты в SQL. Количество: {hive_count}, Средняя цена: {sql_mean}, дисперсия: {sql_variance}')
print(
    f'Рассчеты в Python. Количество: {count}, Средняя цена: {mean}, дисперсия: {variance}')


Рассчеты в SQL. Количество: 48894, Средняя цена: 152.22296299343384, дисперсия: 56902.04073527261
Рассчеты в Python. Количество: 48885, Средняя цена: 152.2322184719247, дисперсия: 52743.84133584466
