In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json,col
from pyspark.sql.types import *
from os.path import abspath

spark = SparkSession\
        .builder\
        .appName("pyspark-notebook")\
        .master("spark://spark-master:7077")\
        .config("spark.executor.memory", "512m")\
        .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
        .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
        .enableHiveSupport()\
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

24/10/23 02:45:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 3. The Data

In [2]:
!apt install wget -y

Reading package lists... Done
Building dependency tree       
Reading state information... Done
wget is already the newest version (1.20.1-1.1).
0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded.


In [3]:
data = spark.read.csv(path="AB_NYC_2019.csv", sep=",", header=True)

                                                                                

In [4]:
data.head()

Row(id='2539', name='Clean & quiet apt home by the park', host_id='2787', host_name='John', neighbourhood_group='Brooklyn', neighbourhood='Kensington', latitude='40.64749', longitude='-73.97237', room_type='Private room', price='149', minimum_nights='1', number_of_reviews='9', last_review='2018-10-19', reviews_per_month='0.21', calculated_host_listings_count='6', availability_365='365')

In [5]:
# Вывод структуры DataFrame и типов столбцов
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



In [6]:
# Создание и заполнение таблицы в Hive
data.write.mode("overwrite").saveAsTable("AB_NYC_price")

# SQL-запрос для расчета среднего значения и дисперсии
query = """
SELECT 
    AVG(price) as mean_price,
    VARIANCE(price) as variance_price,
    COUNT(*) as total_count
FROM AB_NYC_price
WHERE price IS NOT NULL
"""

# Выполнение запроса
result = spark.sql(query)

# Вывод результатов
result.show()

                                                                                

+------------------+-----------------+-----------+
|        mean_price|   variance_price|total_count|
+------------------+-----------------+-----------+
|152.22296299343384|56902.04073527261|      48894|
+------------------+-----------------+-----------+



In [7]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

# Фильтрация строк, где price не является числом
data = data.filter(F.col("price").rlike("^[0-9]*\\.?[0-9]*$"))
data = data.fillna({"price": 0})  # Заменить null на 0

# Преобразование столбца "price" из String в Float
data = data.withColumn("price", F.col("price").cast(FloatType()))

# Проверка на наличие null значений после преобразования
null_count = data.filter(data.price.isNull()).count()
if null_count > 0:
    print(f"В столбце 'price' {null_count} значений не удалось преобразовать в тип Float.")

# Вычисление количества значений и общей суммы
result = data.agg(
    F.count("price").alias("count_prices"),  # Подсчет количества значений в столбце 'price'
    F.sum("price").alias("total_sum")        # Подсчет общей суммы в столбце 'price'
)

# Показать результат
result.show()

+------------+---------+
|count_prices|total_sum|
+------------+---------+
|       48885|7441872.0|
+------------+---------+



In [8]:
from pyspark.sql import functions as F

# Вычисление среднего значения и дисперсии
result = data.agg(
    F.avg("price").alias("average_price"),
    F.variance("price").alias("variance_price")
)

# Показать результат
result.show()

+------------------+-----------------+
|     average_price|   variance_price|
+------------------+-----------------+
|152.23221847192391|56902.27481089729|
+------------------+-----------------+



In [9]:
pip install pandas

You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [10]:
import pandas as pd
from functools import reduce

# Чтение CSV файла в DataFrame
data = pd.read_csv("AB_NYC_2019.csv")

# Удаление нечисловых символов и преобразование в float
data['price'] = data['price'].replace('[\$,]', '', regex=True).astype(float)

# Проверка на наличие NaN значений
nan_count = data['price'].isnull().sum()
if nan_count > 0:
    print(f"В столбце 'price' {nan_count} значений не удалось преобразовать в float.")

# Функция mapper для извлечения цен
def mapper(row):
    return row['price'] if pd.notnull(row['price']) else None

# Применяем mapper к каждой строке DataFrame
scores = data.apply(mapper, axis=1).dropna().tolist()  # Убираем NaN значения

# Функция reducer для вычисления n, mean и M2
def reducer(score_data, score_data2):
    n, mean, M2 = score_data  # score_data - это (n, mean, M2)
    score = score_data2  # score_data2 - это одно значение
    n += 1
    delta = score - mean
    mean += delta / n
    M2 += delta * (score - mean)
    return n, mean, M2

# Проверка наличия оценок и выполнение редукции
if not scores:
    print("Нет действительных оценок.")
else:
    initial_value = (0, 0.0, 0)  # Начальное значение для n, mean, M2
    n, mean, M2 = reduce(reducer, scores, initial_value)
    
    # Дисперсия
    variance = M2 / n if n > 1 else 0  # Избегаем деления на 0
    print("Среднее значение:", mean)
    print("Дисперсия:", variance)

# Подсчет значений после очистки
cleaned_scores = data['price'].dropna().tolist()
print("Количество значений после очистки:", len(cleaned_scores))
print("Общая сумма после очистки:", sum(cleaned_scores))

Среднее значение: 152.72068718682823
Дисперсия: 57672.845698433375
Количество значений после очистки: 48895
Общая сумма после очистки: 7467278.0


Вывод:
- Обнаружено небольшое расхождение в количестве обработанных записей (10 записей), которое отражается на всех остальных показателях.
- Причины различий могут быть в разных подходах к фильтрации невалидных данных, различных способах обработки пропущенных значений, разных критериях отбора записей.
- Несмотря на небольшие различия, оба метода показывают согласованные результаты: все отклонения менее 1.5%.