In [1]:
import os

access_key = os.getenv("AWS_ACCESS_KEY_ID", "Не задано")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "Не задано")
s3_endpoint = os.getenv("AWS_S3_ENDPOINT", "Не задано")


In [2]:
from pyspark.sql import SparkSession

# Создание SparkSession
spark = SparkSession.builder \
    .appName("BostonCrimeAnalysis") \
    .config("spark.sql.shuffle.partitions", "2") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
    .config("spark.hadoop.fs.s3a.connection.maximum", "100") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true")  \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.region", "us-east-1") \
    .getOrCreate()

# Тестовый запрос
spark.range(3).show()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/12 12:18:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                        (0 + 12) / 12]

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+



                                                                                

In [3]:
# Загрузка данных
input_path = '/data/crime.csv'
crimes_df = spark.read.option("header", "true").csv(input_path, inferSchema=True)
crimes_df.printSchema()
crimes_df.show(5)

                                                                                

root
 |-- INCIDENT_NUMBER: string (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- REPORTING_AREA: string (nullable = true)
 |-- SHOOTING: string (nullable = true)
 |-- OCCURRED_ON_DATE: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Location: string (nullable = true)

+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP| OFFENSE_

In [8]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from functools import reduce  # Добавлен импорт reduce

def clean_boston_crime_data(df: DataFrame) -> DataFrame:
    """
    Очищает данные о преступлениях в Бостоне:
    - Заполняет пропуски в столбце 'SHOOTING'.
    - Преобразует 'OCCURRED_ON_DATE' в формат timestamp.
    - Разделяет 'Location' на 'Lat' и 'Long'.
    - Проверяет допустимость значений координат.
    - Удаляет дубликаты по 'INCIDENT_NUMBER'.
    - Проверяет и фильтрует значения в 'UCR_PART'.
    
    Parameters:
    df (DataFrame): Исходный DataFrame с данными о преступлениях.
    
    Returns:
    DataFrame: Очищенный DataFrame.
    """
    
    # Заполнение пропусков в столбце SHOOTING значением False
    df_cleaned = df.fillna({'SHOOTING': 'False'})
    
    # Преобразование столбца OCCURRED_ON_DATE в правильный формат даты
    df_cleaned = df_cleaned.withColumn("OCCURRED_ON_DATE", F.to_timestamp("OCCURRED_ON_DATE", "yyyy-MM-dd HH:mm:ss"))
    
    # Разделение Location на Lat и Long
    df_cleaned = df_cleaned.withColumn("Lat", F.regexp_extract("Location", r"\((.*),", 1).cast("double")) \
                           .withColumn("Long", F.regexp_extract("Location", r", (.*)\)", 1).cast("double"))
    
    # Проверка значений Lat и Long (широта от -90 до 90, долгота от -180 до 180)
    df_cleaned = df_cleaned.filter((df_cleaned.Lat >= -90) & (df_cleaned.Lat <= 90) &
                                   (df_cleaned.Long >= -180) & (df_cleaned.Long <= 180))
    
    # Удаление дубликатов по INCIDENT_NUMBER
    df_cleaned = df_cleaned.dropDuplicates(subset=["INCIDENT_NUMBER"])
    
    # Проверка на корректность значений в столбце UCR_PART (должны быть только "Part One", "Part Two", "Part Three", "Other")
    valid_ucr_parts = ["Part One", "Part Two", "Part Three", "Other"]
    df_cleaned = df_cleaned.filter(df_cleaned.UCR_PART.isin(valid_ucr_parts))

    # Столбцы для проверки на None
    columns_to_check = ['UCR_PART', 'Lat', 'Long', 'DISTRICT']

    # Формирование условия для фильтрации строк с None в указанных столбцах
    condition = reduce(lambda acc, col: acc & df[col].isNotNull(), columns_to_check, F.lit(True))

    # Применение фильтрации
    df_cleaned = df_cleaned.filter(condition)
    
    return df_cleaned

In [7]:
df_cleaned = clean_boston_crime_data(crimes_df)
df_cleaned.show(truncate=False)

NameError: name 'clean_boston_crime_data' is not defined

In [4]:
from pyspark.sql.window import Window

def create_bi_dataset(df):
    # Извлечение первой части названия преступления (crime_type) из столбца 'OFFENSE_DESCRIPTION'
    df = df.withColumn('crime_type', F.split(df['OFFENSE_DESCRIPTION'], ' - ')[0])
    
    # Рассчитываем общее количество преступлений по районам
    crimes_total = df.groupBy('DISTRICT').agg(F.count('*').alias('crimes_total'))
    
    # Медиана числа преступлений в месяц в каждом районе
    crimes_monthly = df.groupBy('DISTRICT', 'YEAR', 'MONTH').agg(F.count('*').alias('monthly_crimes'))
    crimes_monthly = crimes_monthly.groupBy('DISTRICT').agg(F.expr('percentile_approx(monthly_crimes, 0.5)').alias('crimes_monthly'))
    
    # Три самых частых типа преступлений по району
    frequent_crime_types = df.groupBy('DISTRICT', 'crime_type').agg(F.count('*').alias('crime_type_count'))
    frequent_crime_types = frequent_crime_types.withColumn('rank', F.row_number().over(Window.partitionBy('DISTRICT').orderBy(F.desc('crime_type_count'))))
    frequent_crime_types = frequent_crime_types.filter(frequent_crime_types['rank'] <= 3)
    frequent_crime_types = frequent_crime_types.groupBy('DISTRICT').agg(F.concat_ws(', ', F.collect_list('crime_type')).alias('frequent_crime_types'))
    
    # Средняя широта и долгота для района
    avg_coordinates = df.groupBy('DISTRICT').agg(
        F.avg('Lat').alias('lat'),
        F.avg('Long').alias('lng')
    )
    
    # Объединение всех агрегаций в одну витрину
    dashboard = crimes_total.join(crimes_monthly, on='DISTRICT', how='inner') \
                            .join(frequent_crime_types, on='DISTRICT', how='inner') \
                            .join(avg_coordinates, on='DISTRICT', how='inner')

    return dashboard

In [6]:
# Создание витрины
bi_dataset = create_bi_dataset(df_cleaned)
bi_dataset.show(5)

NameError: name 'df_cleaned' is not defined

In [5]:
try:
    path = "output/bi_data.parquet"
    bucket_name = "spark-data"
    output_path = f"s3a://{bucket_name}/{path}"
    bi_dataset.write.parquet(output_path)
    print(f"Data successfully uploaded to {output_path}")
except Exception as e:
    print(f"Error occurred while saving to MinIO: {e}")
        

Error occurred while saving to MinIO: name 'bi_dataset' is not defined
