# Импорт необходимых библиотек

In [1]:
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F
import logging
import os
from airflow.models import Variable
from dotenv import load_dotenv
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DataType
import requests
import json
import boto3
import minio

In [2]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Загрузка и проверка переменных окружения
load_dotenv()



def create_spark_session():
    try:
        spark = SparkSession.builder \
            .appName("MinIO Data Reader") \
            .config("spark.hadoop.fs.s3a.access.key", "app_user") \
            .config("spark.hadoop.fs.s3a.secret.key", "secure_password123") \
            .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
            .config("spark.jars", "/opt/jars/hadoop-aws-3.3.1.jar,/opt/jars/aws-java-sdk-bundle-1.11.901.jar") \
            .getOrCreate()

    
        logging.info('SparkSession успешно создана для работы с MinIO')
    except Exception as e:
        logging.error(f'SparkSession не создана по причине: {e}')
        raise
    return spark
    

In [3]:
def download_to_clickhouse():
    path = "s3a://weather/weather/weather_data.json"
    spark = create_spark_session()
    schema = StructType([
        StructField("latitude", DoubleType()),
        StructField("longitude", DoubleType()),
        StructField("timezone", StringType()),
        StructField("current_weather_units", StructType([
            StructField("time", StringType())
        ])),
        StructField("current_weather", StructType([
            StructField("time", StringType()),
            StructField("temperature", DoubleType()),
            StructField("windspeed", DoubleType())
    ]))
    ])
    raw_df = spark.read.json(path, schema=schema, multiLine=True)
    #raw_df.show(truncate=False)
    #raw_df.printSchema()
    flat_df = raw_df.select(
        "latitude", "longitude", "timezone",
        F.col("current_weather_units.time").alias("format_time"),
        F.col("current_weather.time").alias("time"),
        F.col("current_weather.temperature").alias("temperature"),
        F.col("current_weather.windspeed").alias("windspeed")
        
    )
    flat_df.show()
    return flat_df
download_to_clickhouse()    

[[34m2025-07-11T20:09:15.610+0000[0m] {[34m4075142977.py:[0m26} INFO[0m - SparkSession успешно создана для работы с MinIO[0m
+--------+---------+--------+-----------+----------------+-----------+---------+
|latitude|longitude|timezone|format_time|            time|temperature|windspeed|
+--------+---------+--------+-----------+----------------+-----------+---------+
|   55.75|   37.625|     GMT|    iso8601|2025-07-11T20:00|       27.5|      3.9|
+--------+---------+--------+-----------+----------------+-----------+---------+



DataFrame[latitude: double, longitude: double, timezone: string, format_time: string, time: string, temperature: double, windspeed: double]

In [None]:
import requests

try:
    response = requests.get('http://minio:9000')
    print('Response status:', response.status_code)
except Exception as e:
    print('Request error:', e)


In [None]:
def get_api_data():
    try:
        response = requests.get("https://api.open-meteo.com/v1/forecast",
        params={
                "latitude": 55.75,
                "longitude": 37.62,
                "current_weather": True
                }
        )
        if response.status_code == 200:
            data = response.json()
            json_str = json.dumps(data, ensure_ascii=False, indent=2)
            logging.info('Данные успешно загружены')
            return json_str
        else:
            logging.error(f'Error: {response.status_code} - Данные не загружены')
            return None
    except:
        logging.error(f'Error: Данные не загружены')
        raise

get_api_data()

In [1]:
from pyspark.sql import SparkSession

jars = [
    "/home/jovyan/jars/hadoop-aws-3.3.1.jar",
    "/home/jovyan/jars/aws-java-sdk-bundle-1.11.901.jar",
    "/home/jovyan/jars/clickhouse-jdbc-0.3.2.jar"
]

spark = SparkSession.builder \
    .appName("Test SparkSession") \
    .config("spark.jars", ",".join(jars)) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("SparkSession создана")
print("Подключенные JAR-файлы:", spark.sparkContext.getConf().get("spark.jars"))

SparkSession создана
Подключенные JAR-файлы: /home/jovyan/jars/hadoop-aws-3.3.1.jar,/home/jovyan/jars/aws-java-sdk-bundle-1.11.901.jar,/home/jovyan/jars/clickhouse-jdbc-0.3.2.jar


# Подключение к Minio через библиотеку boto3 

In [None]:
load_dotenv()

# Получаем переменные
endpoint = os.getenv("MINIO_ENDPOINT") # для airflow нужно будет переписать чтобы значения брались из Variables
access_key = os.getenv("MINIO_ACCESS_KEY")
secret_key = os.getenv("MINIO_SECRET_KEY")

# Подключаемся к MinIO
s3 = boto3.client(
    's3',
    endpoint_url=f"http://{endpoint}",
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key

)

# Проверяем список бакетов
try:
    buckets = s3.list_buckets()
    print("Подключение к MinIO через boto3 успешно. Список бакетов:")
    for b in buckets['Buckets']:
        print(f"  - {b['Name']}")
except Exception as e:
    print("Ошибка подключения к MinIO:", e)

# **Загрузка через boto3**

In [None]:
from airflow.hooks.base import BaseHook
import boto3

def upload_to_minio():
    conn = BaseHook.get_connection('minio_default')
    extra = conn.extra_dejson  # Параметры из Extra
    
    s3 = boto3.client(
        's3',
        endpoint_url=extra['endpoint_url'],
        aws_access_key_id=extra['aws_access_key_id'],
        aws_secret_access_key=extra['aws_secret_access_key']
    )
    
    s3.upload_file('/local/path/file.txt', 'bucket-name', 'file.txt')

# Подключение к Minio через библиотеку Minio 

In [None]:
client = Minio(
    endpoint=os.getenv("MINIO_ENDPOINT"), # для airflow нужно будет переписать чтобы значения брались из Variables
    access_key=os.getenv("MINIO_ACCESS_KEY"),
    secret_key=os.getenv("MINIO_SECRET_KEY"),
    secure=False  
)

# Проверяем список бакетов
try:
    buckets = client.list_buckets()
    print("Подключение к MinIO через MinIO успешно. Список бакетов:")
    for b in buckets:
        print(f' - {b.name}')
except Exception as e:
    print("Ошибка подключения к MinIO:", e)

# Проверка SparkSession 

In [None]:
spark = SparkSession.builder \
    .appName("MinIO Test") \
    .getOrCreate()

df = spark.range(10)
df.show()

# Запуск SparkSession с различными параметрами 

**Имя приложения, которое будет отражено в Spark UI и логах**

appName("SparkProject")

**Локальный режим (может быть yarn, mesos, kubernetes)**

master("local[*]") 

**Количество партиций, используемых при операциях shuffle (например, groupBy, join). По умолчанию = 200.
Для небольших наборов данных можно уменьшить, чтобы не было лишних партиций.**

config("spark.sql.shuffle.partitions", "200") 

**Объем памяти, выделяемый каждому executor'у. Чем больше данных — тем больше памяти потребуется.**

config("spark.executor.memory", "4g")

**Объем памяти для драйвера — основной управляющий процесс, запускающий задачи и отслеживающий их выполнение.**

config("spark.driver.memory", "2g") 

**Количество ядер CPU, которые каждый executor может использовать. Больше ядер — больше параллелизма, но стоит учитывать общую нагрузку.**

config("spark.executor.cores", "4")

**Включает динамическое выделение executors в зависимости от загрузки. Позволяет Spark автоматически масштабироваться: добавлять и убирать executors в зависимости от нужд приложения. Требует настройки shuffle service.**

config("spark.dynamicAllocation.enabled", "true")

**Указывает директорию для checkpoint'ов (резервного сохранения состояния, особенно в streaming-приложениях).**

config("spark.checkpoint.dir", "/path/to/checkpoint/dir")

**Путь к директории, где Spark SQL будет хранить управляемые таблицы (при использовании saveAsTable, например).**

config("spark.sql.catalogImplementation", "hive")


**Указывает, что Spark должен использовать Hive Catalog (а не встроенный In-Memory метастор). Требует наличие Hive или его имитации (например, Derby DB).**

.enableHiveSupport()


In [None]:
# spark = SparkSession.builder \
#     .appName("SparkProject") \ 
#     .master("local[*]") \ 
#     .config("spark.sql.shuffle.partitions", "200") \ 
#     .config("spark.executor.memory", "4g") \ 
#     .config("spark.driver.memory", "2g") \ 
#     .config("spark.executor.cores", "4") \ 
#     .config("spark.dynamicAllocation.enabled", "true") \ 
#     .config("spark.checkpoint.dir", "/path/to/checkpoint/dir") \ 
#     .config("spark.sql.warehouse.dir", "/path/to/warehouse/dir") \ 
#     .config("spark.sql.catalogImplementation", "hive") \ 
#     .getOrCreate()

# Получаем данные по API

In [None]:
def get_api_data(env_name):
    load_dotenv()
    # Получаем ссылку из файла .env
    api_url = os.getenv(env_name) # для airflow нужно будет переписать чтобы значения брались из Variables
    # Проверка наличия ссылки
    if not api_url:
        logging.error(f'Указанный URL не задан в файле .env') # для airflow нужно будет переписать чтобы значения брались из Variables
        return None
    try:
        response = requests.get(api_url)
        if response.status_code == 200:
            data = response.json()
            json_str = json.dumps(data, ensure_ascii=False, indent=2)
            logging.info('Данные успешно загружены')
            return json_str
        else:
            logging.error(f'Error: {response.status_code} - Данные не загружены')
            return None
    except:
        logging.error(f'Error{e} - Данные не загружены')
        raise



# Сохраняем JSON в MinIO

In [None]:
def save_json_to_minio(env_name):
    data = get_api_data(env_name)
    
# Написать функцию, чтобы записать в Minio JSON
# Проверить в какой бакет пишем перезаписью
# Через Spark читать файл
    

In [None]:
spark = SparkSession.builder \
    .appName("SparkProject") \ 
    .master("local[*]") \ 
    .config("spark.sql.shuffle.partitions", "200") \ 
    .config("spark.driver.memory", "2g") \ 
    .config("spark.dynamicAllocation.enabled", "true") \ 
    .getOrCreate()