# MGIMO intensive

## Big Data with Apache Spark

![Apache Spark](imgs/spark.png)

### 1. Libraries

In [None]:
import os
import sys
import json
import boto3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
pd.set_option('display.max_columns', None)

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, struct, count_distinct, from_unixtime

In [None]:
# web UI for the Spark

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

# Spark settings
conf = SparkConf()
conf.set('spark.master', 'local[*]')
conf.set('spark.driver.memory', '4G')
conf.set('spark.driver.maxResultSize', '2G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

spark

### 2. Look at the data

In [None]:
file_path = "./data_section37_112_v20250918_section_file/data_section37_112_v20250918.parquet"
sdf = spark.read.parquet(file_path)

In [None]:
sdf.printSchema()

In [None]:
sdf.count()

In [None]:
file_paths = [
    "./data_section35_112_v20250918_section_file/data_section35_112_v20250918.parquet",
    "./data_section37_112_v20250918_section_file/data_section37_112_v20250918.parquet",
    "./data_section39_112_v20250918_section_file/data_section39_112_v20250918.parquet"
]
sdf = spark.read.parquet(*file_paths)
sdf.printSchema()

In [None]:
sdf.count()

### 3. Analytics with Spark

This prompt will help us:

```prompt
## Базовые параметры
- Роль: Junior Python Developer (Data Analyst)
- Специализация: data analysis, data visualization, data collection
- Уровень: начинающий
- Температура: 0 (максимальная точность и предсказуемость)

## Контекст выполнения
Разработка кода для анализа больших данных

## Входные данные
- PySpark датафрейм `sdf` c данными 
- Формат файла: `parquet`

## Технические ограничения
- Использовать PySpark и стандартные библиотеки Python
- Код запускается в интерактивном ноутбуке Jupyter
- Необходимо учесть производительность, количество записей 5 млн

## Требования к реализации

### Функциональные требования
1. Требуется аналитика по основным показателям, доступным в датафрейме
2. Требуется общая описательная аналитика 
3. Визуализация стандартными библиотеками 
4. Необходимо разбить анализ на последовательные блоки (общие показатели, отдельные тренды, визуализация, гипотезы)

### Технические требования
- Сложность кода: не используй классы, ограничься функциями
- Архитектура: упрощенная, для использования в интерактивных ноутбуках
- Стиль кода: PEP 8, black (длина строки 79)
- Документация: Docstrings в стиле Google
- Безопасность: Никаких захардкоженных credentials

### Структура данных 
sdf.printSchema()
<ВЫВОД КОМАНДЫ>

### Требования к библиотекам
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, struct, count_distinct, from_unixtime
```

In [None]:
# =============================================================================
# БЛОК 1: ОБЩИЕ ПОКАЗАТЕЛИ ДАТАФРЕЙМА
# =============================================================================

def get_basic_statistics(df):
    """
    Расчет базовых статистик датафрейма.
    
    Args:
        df: PySpark DataFrame
    
    Returns:
        Dictionary с основными метриками
    """
    basic_stats = {}
    
    # Количество уникальных значений в ключевых колонках
    basic_stats['total_records'] = df.count()
    basic_stats['unique_regions'] = df.select('region_name').distinct().count()
    basic_stats['unique_municipalities'] = df.select('municipality').distinct().count()
    basic_stats['unique_indicators'] = df.select('indicator_code').distinct().count()
    basic_stats['year_range'] = df.agg(
        F.min('year').alias('min_year'),
        F.max('year').alias('max_year')
    ).collect()[0]
    
    # Проверка на пропущенные значения
    null_counts = df.select([
        F.sum(F.col(c).isNull().cast('int')).alias(c) 
        for c in ['indicator_value', 'region_name', 'municipality', 'year']
    ]).collect()[0]
    
    basic_stats['null_values'] = null_counts.asDict()
    
    return basic_stats

# Вывод общей статистики
basic_stats = get_basic_statistics(sdf)
print("\n" + "="*50)
print("ОБЩАЯ СТАТИСТИКА ДАННЫХ")
print("="*50)
print(f"Всего записей: {basic_stats['total_records']:,}")
print(f"Количество регионов: {basic_stats['unique_regions']}")
print(f"Количество муниципалитетов: {basic_stats['unique_municipalities']}")
print(f"Количество показателей: {basic_stats['unique_indicators']}")
print(f"Период данных: {basic_stats['year_range']['min_year']} - {basic_stats['year_range']['max_year']}")
print(f"Пропущенные значения: {basic_stats['null_values']}")

In [None]:
# =============================================================================
# БЛОК 2: ОПИСАТЕЛЬНАЯ СТАТИСТИКА ПОКАЗАТЕЛЕЙ
# =============================================================================

def get_descriptive_stats(df):
    """
    Расчет описательных статистик для числовых показателей.
    
    Args:
        df: PySpark DataFrame
    
    Returns:
        PySpark DataFrame с описательными статистиками
    """
    # Статистики по каждому показателю
    descriptive_stats = df.groupBy('indicator_code', 'indicator_name') \
        .agg(
            F.count('indicator_value').alias('count'),
            F.avg('indicator_value').alias('mean'),
            F.stddev('indicator_value').alias('stddev'),
            F.min('indicator_value').alias('min'),
            F.percentile_approx('indicator_value', 0.25).alias('q1'),
            F.percentile_approx('indicator_value', 0.5).alias('median'),
            F.percentile_approx('indicator_value', 0.75).alias('q3'),
            F.max('indicator_value').alias('max')
        ) \
        .orderBy('indicator_code')
    
    return descriptive_stats

# Получение описательной статистики
desc_stats = get_descriptive_stats(sdf)
print("\n" + "="*50)
print("ОПИСАТЕЛЬНАЯ СТАТИСТИКА ПОКАЗАТЕЛЕЙ")
print("="*50)
#desc_stats.show(10, truncate=False)
display(desc_stats.limit(10).toPandas())

# Анализ по годам
def yearly_summary(df):
    """
    Сводка по годам: количество записей и средние значения.
    """
    yearly_stats = df.groupBy('year') \
        .agg(
            F.count('*').alias('records_count'),
            F.avg('indicator_value').alias('avg_indicator_value'),
            F.countDistinct('indicator_code').alias('unique_indicators'),
            F.countDistinct('municipality').alias('municipalities_count')
        ) \
        .orderBy('year')
    
    return yearly_stats

yearly_stats = yearly_summary(sdf)
print("\nСтатистика по годам:")
display(yearly_stats.toPandas())

In [None]:
# =============================================================================
# БЛОК 3: ВИЗУАЛИЗАЦИЯ ОСНОВНЫХ ТРЕНДОВ
# =============================================================================

def plot_yearly_trends(df, top_n=5):
    """
    Визуализация трендов по годам для топ показателей.
    
    Args:
        df: PySpark DataFrame
        top_n: количество показателей для визуализации
    """
    # Определяем топ показателей по количеству записей
    top_indicators = df.groupBy('indicator_code', 'indicator_name') \
        .agg(F.count('*').alias('count')) \
        .orderBy(F.desc('count')) \
        .limit(top_n) \
        .select('indicator_code') \
        .rdd.flatMap(lambda x: x).collect()
    
    # Собираем данные для визуализации
    trend_data = []
    for indicator in top_indicators:
        indicator_data = df.filter(F.col('indicator_code') == indicator) \
            .groupBy('year') \
            .agg(F.avg('indicator_value').alias('avg_value')) \
            .orderBy('year') \
            .toPandas()
        
        indicator_data['indicator'] = indicator
        trend_data.append(indicator_data)
    
    # Создание визуализации
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    
    # График 1: Тренды топ показателей
    ax1 = axes[0]
    for data in trend_data:
        if not data.empty:
            ax1.plot(data['year'], data['avg_value'], marker='o', label=data['indicator'].iloc[0])
    
    ax1.set_xlabel('Год')
    ax1.set_ylabel('Среднее значение')
    ax1.set_title('Динамика топ показателей по годам')
    ax1.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    ax1.grid(True, alpha=0.3)
    
    # График 2: Распределение записей по годам
    ax2 = axes[1]
    yearly_counts = df.groupBy('year').count().orderBy('year').toPandas()
    ax2.bar(yearly_counts['year'], yearly_counts['count'], color='skyblue', edgecolor='navy')
    ax2.set_xlabel('Год')
    ax2.set_ylabel('Количество записей')
    ax2.set_title('Распределение записей по годам')
    ax2.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.show()

def plot_regional_distribution(df):
    """
    Визуализация распределения данных по регионам.
    """
    # Топ регионов по количеству записей
    region_stats = df.groupBy('region_name') \
        .agg(F.count('*').alias('records')) \
        .orderBy(F.desc('records')) \
        .limit(15) \
        .toPandas()
    
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    
    # Горизонтальная бар-чарт
    ax1 = axes[0]
    y_pos = np.arange(len(region_stats))
    ax1.barh(y_pos, region_stats['records'])
    ax1.set_yticks(y_pos)
    ax1.set_yticklabels(region_stats['region_name'])
    ax1.set_xlabel('Количество записей')
    ax1.set_title('Топ-15 регионов по количеству записей')
    ax1.invert_yaxis()
    
    # Круговая диаграмма распределения по типам муниципалитетов
    ax2 = axes[1]
    mun_type_stats = df.groupBy('mun_type') \
        .agg(F.count('*').alias('count')) \
        .orderBy(F.desc('count')) \
        .toPandas()
    
    # Ограничим количество категорий для читаемости
    other_sum = mun_type_stats[mun_type_stats['count'] < mun_type_stats['count'].quantile(0.7)]['count'].sum()
    main_categories = mun_type_stats[mun_type_stats['count'] >= mun_type_stats['count'].quantile(0.7)]
    
    if other_sum > 0:
        main_categories = pd.concat([
            main_categories,
            pd.DataFrame({'mun_type': ['Другие'], 'count': [other_sum]})
        ])
    
    ax2.pie(main_categories['count'], labels=main_categories['mun_type'], autopct='%1.1f%%')
    ax2.set_title('Распределение по типам муниципалитетов')
    
    plt.tight_layout()
    plt.show()

# Вызов функций визуализации
print("\n" + "="*50)
print("ВИЗУАЛИЗАЦИЯ ДАННЫХ")
print("="*50)
plot_yearly_trends(sdf)
plot_regional_distribution(sdf)

In [None]:
# =============================================================================
# БЛОК 4: АНАЛИЗ ГИПОТЕЗ
# =============================================================================

def analyze_correlations(df):
    """
    Анализ корреляций между показателями.
    
    Args:
        df: PySpark DataFrame
    """
    # Выбираем топ показателей по встречаемости
    top_indicators = df.groupBy('indicator_code') \
        .agg(F.count('*').alias('count')) \
        .orderBy(F.desc('count')) \
        .limit(10) \
        .select('indicator_code') \
        .rdd.flatMap(lambda x: x).collect()
    
    # Создаем сводную таблицу для корреляционного анализа
    pivot_df = df.filter(F.col('indicator_code').isin(top_indicators)) \
        .groupBy('municipality', 'year') \
        .pivot('indicator_code') \
        .agg(F.first('indicator_value'))
    
    # Конвертируем в pandas для корреляционного анализа (только для небольшого объема)
    # Внимание: используйте только для агрегированных данных, не для полного датафрейма
    pandas_df = pivot_df.limit(1000).toPandas()
    
    # Удаляем нечисловые колонки для корреляции
    numeric_cols = pandas_df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 1:
        corr_matrix = pandas_df[numeric_cols].corr()
        
        # Визуализация корреляционной матрицы
        fig, ax = plt.subplots(figsize=(10, 8))
        im = ax.imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
        
        # Настройка отображения
        ax.set_xticks(np.arange(len(numeric_cols)))
        ax.set_yticks(np.arange(len(numeric_cols)))
        ax.set_xticklabels(numeric_cols, rotation=45, ha='right')
        ax.set_yticklabels(numeric_cols)
        
        plt.colorbar(im)
        plt.title('Корреляционная матрица топ показателей')
        plt.tight_layout()
        plt.show()
        
        return corr_matrix
    
    return None

def check_data_quality_hypothesis(df):
    """
    Проверка гипотез о качестве данных.
    """
    hypotheses_results = {}
    
    # Гипотеза 1: Пропуски в данных увеличиваются со временем
    nulls_by_year = df.groupBy('year') \
        .agg(
            F.sum(F.col('indicator_value').isNull().cast('int')).alias('nulls_count'),
            F.count('*').alias('total_count')
        ) \
        .withColumn('nulls_percentage', F.col('nulls_count') / F.col('total_count') * 100) \
        .orderBy('year') \
        .toPandas()
    
    hypotheses_results['nulls_trend'] = nulls_by_year
    
    # Визуализация
    if not nulls_by_year.empty:
        fig, ax = plt.subplots(figsize=(10, 6))
        ax.plot(nulls_by_year['year'], nulls_by_year['nulls_percentage'], 
                marker='o', linewidth=2, markersize=8)
        ax.set_xlabel('Год')
        ax.set_ylabel('Процент пропусков (%)')
        ax.set_title('Динамика пропусков в данных по годам')
        ax.grid(True, alpha=0.3)
        plt.show()
    
    # Гипотеза 2: Единицы измерения согласованы для каждого показателя
    units_check = df.groupBy('indicator_code') \
        .agg(F.countDistinct('indicator_unit').alias('unique_units')) \
        .filter(F.col('unique_units') > 1) \
        .count()
    
    hypotheses_results['inconsistent_units'] = units_check > 0
    
    print("\n" + "="*50)
    print("РЕЗУЛЬТАТЫ ПРОВЕРКИ ГИПОТЕЗ")
    print("="*50)
    print(f"Наличие несогласованных единиц измерения: {hypotheses_results['inconsistent_units']}")
    
    return hypotheses_results

# Выполнение анализа гипотез
print("\n" + "="*50)
print("АНАЛИЗ ГИПОТЕЗ")
print("="*50)

# Корреляционный анализ (осторожно с большими данными)
try:
    corr_results = analyze_correlations(sdf)
    if corr_results is not None:
        print("Корреляционный анализ выполнен успешно")
except Exception as e:
    print(f"Корреляционный анализ пропущен: {e}")

# Проверка гипотез о качестве данных
quality_results = check_data_quality_hypothesis(sdf)

In [None]:
# =============================================================================
# ИТОГОВЫЙ ОТЧЕТ
# =============================================================================

def generate_summary_report(df, basic_stats, desc_stats):
    """
    Генерация итогового отчета.
    """
    print("\n" + "="*60)
    print("ИТОГОВЫЙ АНАЛИТИЧЕСКИЙ ОТЧЕТ")
    print("="*60)
    
    print("\n1. ОБЩАЯ ИНФОРМАЦИЯ:")
    print(f"   - Объем данных: {basic_stats['total_records']:,} записей")
    print(f"   - Период: {basic_stats['year_range']['min_year']}-{basic_stats['year_range']['max_year']}")
    print(f"   - Регионов: {basic_stats['unique_regions']}")
    print(f"   - Муниципалитетов: {basic_stats['unique_municipalities']}")
    print(f"   - Показателей: {basic_stats['unique_indicators']}")
    
    print("\n2. КАЧЕСТВО ДАННЫХ:")
    print(f"   - Пропуски в ключевых полях: {basic_stats['null_values']}")
    
    print("\n3. КЛЮЧЕВЫЕ ВЫВОДЫ:")
    
    # Анализ динамики
    yearly_growth = df.groupBy('year') \
        .agg(F.avg('indicator_value').alias('avg_value')) \
        .orderBy('year') \
        .collect()
    
    if len(yearly_growth) > 1:
        first_year = yearly_growth[0]['avg_value']
        last_year = yearly_growth[-1]['avg_value']
        growth = ((last_year - first_year) / first_year * 100) if first_year else 0
        print(f"   - Общий рост показателей за период: {growth:.1f}%")
    
    print("\n4. РЕКОМЕНДАЦИИ:")
    print("   - Проверить согласованность единиц измерения для всех показателей")
    print("   - Обратить внимание на регионы с наибольшим количеством пропусков")
    print("   - Рассмотреть возможность агрегации данных для ускорения анализа")
    
    # Освобождение кеша
    df.unpersist()

# Генерация итогового отчета
generate_summary_report(sdf, basic_stats, desc_stats)

print("\nАнализ завершен.")