# Скрипты для загрузки данных в БД PostgreeSQL

Скрипты для переноса датасетов в базу данных PostgreeSQL, развернутую в Yandex Cloud

## 1. Загрузка необходимых библиотек

In [1]:
import psycopg2
import pandas as pd

## 2. Настройка параметров подключения к БД

Секреты:

In [2]:
host = ''
dbname = ''
user = ''
password = ''

Параметры подключения:

In [3]:
conn_params = f"""
        host={host}
        port=6432
        sslmode=verify-full
        dbname={dbname}
        user={user}
        password={password}
        sslmode=require
        target_session_attrs=read-write
    """

## 3. Скрипты для взаимодействия с базой данных

### 3.1. Загрузка датасета с основной информацией о ДТП в БД

Создание таблицы (если не существует):

In [42]:
create_table_dtp_general_dataset_query = '''
CREATE TABLE IF NOT EXISTS dtp_general_dataset (
    id INTEGER,
    light VARCHAR(500),
    nearby VARCHAR(500),
    region VARCHAR(100),
    scheme VARCHAR(10),
    weather VARCHAR(100),
    category VARCHAR(500),
    datetime TIMESTAMP,
    severity VARCHAR(100),
    dead_count INTEGER,
    injured_count INTEGER,
    parent_region VARCHAR(100),
    road_conditions VARCHAR(500),
    participants_count INTEGER,
    participant_categories VARCHAR(500),
    lat FLOAT,
    long FLOAT,
    year INTEGER,
    month INTEGER,
    day INTEGER,
    hour INTEGER,
    light_day INTEGER,
    dark_light_on INTEGER,
    dark_no_light INTEGER,
    crossroads INTEGER,
    pedestrian_crossing INTEGER,
    regulated INTEGER,
    is_clear INTEGER,
    is_cloudy INTEGER,
    is_snowfall INTEGER,
    is_rain INTEGER,
    is_collision_with_human INTEGER,
    is_collision_with_cyclist INTEGER,
    ice_troubles INTEGER,
    participant_biker INTEGER,
    participant_pedestrian INTEGER,
    participant_cyclist INTEGER
);
'''

conn = psycopg2.connect(conn_params)
cur = conn.cursor()
cur.execute(create_table_dtp_general_dataset_query)
conn.commit()

Загрузим данные в БД из подготовленного датасета:

In [14]:
conn = psycopg2.connect(conn_params)
cur = conn.cursor()

try:
    with open('dtp_general_dataset.csv', 'r') as f:
            cur.copy_expert("COPY dtp_general_dataset FROM STDIN WITH CSV HEADER", f)
    
    conn.commit()
    print("Данные успешно загружены в базу данных.")

except Exception as e:
    print(f"Произошла ошибка: {e}")

finally:
    if cur:
        cur.close()
    if conn:
        conn.close()

Данные успешно загружены в базу данных.


In [12]:
df = pd.read_csv('dtp_general_dataset.csv')

In [13]:
df.shape

(1430278, 37)

### 3.2. Загрузка датасета с информацией о транспортных средствах - участниках ДТП

Создание таблицы (если не существует):

In [21]:
create_table_vehicles_data_query = '''
CREATE TABLE IF NOT EXISTS vehicles_data (
    id INTEGER,
    year INTEGER,
    brand VARCHAR(100),
    color VARCHAR(100),
    model VARCHAR(100),
    category VARCHAR(200),
    region VARCHAR(100),
    parent_region VARCHAR(100),
    datetime TIMESTAMP,
    age INTEGER
);
'''

conn = psycopg2.connect(conn_params)
cur = conn.cursor()
cur.execute(create_table_vehicles_data_query)
conn.commit()

Загрузим данные в БД из подготовленного датасета:

In [22]:
conn = psycopg2.connect(conn_params)
cur = conn.cursor()

try:
    with open('vehicles_data.csv', 'r') as f:
            cur.copy_expert("COPY vehicles_data FROM STDIN WITH CSV HEADER", f)
    
    conn.commit()
    print("Данные успешно загружены в базу данных.")

except Exception as e:
    print(f"Произошла ошибка: {e}")

finally:
    if cur:
        cur.close()
    if conn:
        conn.close()

Данные успешно загружены в базу данных.


### 3.3. Загрузка датасета с информацией об участниках ДТП

Создание таблицы (если не существует):

In [34]:
create_table_participants_data_query = '''
CREATE TABLE IF NOT EXISTS participants_data (
    id INTEGER,
    gender VARCHAR(20),
    years_of_driving_experience FLOAT,
    region VARCHAR(100),
    parent_region VARCHAR(100),
    datetime TIMESTAMP,
    violations VARCHAR(500),
    health_status VARCHAR(100),
    role VARCHAR(100)
);
'''

conn = psycopg2.connect(conn_params)
cur = conn.cursor()
cur.execute(create_table_participants_data_query)
conn.commit()

Загрузим данные в БД из подготовленного датасета:

In [35]:
conn = psycopg2.connect(conn_params)
cur = conn.cursor()

try:
    with open('participants_data.csv', 'r') as f:
            cur.copy_expert("COPY participants_data FROM STDIN WITH CSV HEADER", f)
    
    conn.commit()
    print("Данные успешно загружены в базу данных.")

except Exception as e:
    print(f"Произошла ошибка: {e}")

finally:
    if cur:
        cur.close()
    if conn:
        conn.close()

Данные успешно загружены в базу данных.


### 3.4. Создание материализованного представления для агрегированной информации о ДТП по регионам

Материализованное представление необходимо для ускорения работы дашборда за счет вычислений и агрегации датасета на стороне базы данных.

Запрос на создание материализованного представления создает таблицу содержащую:
* Региона (группировка)
* Год (группировка)
* Количество ДТП
* Количество пострадавших
* Количество участников
* Количество погибших
* Население региона

In [15]:
create_materialized_view_of_general_data_query = """
CREATE MATERIALIZED VIEW dtp_agregated_data AS
SELECT gdd.parent_region AS region,
       gdd.year,
       COUNT(DISTINCT gdd.id) AS dtp_amount,
       SUM(gdd.injured_count) AS injured_count,
       SUM(gdd.participants_count) AS participants_count,
       SUM(gdd.dead_count) AS dead_count,
       MAX(rp.population) AS population
FROM public.dtp_general_dataset gdd
LEFT JOIN public.regions_population rp ON gdd.parent_region = rp.region AND gdd."year" = rp."year"
GROUP BY gdd.parent_region, gdd.year;
"""

Выполнение запроса:

In [16]:
conn = psycopg2.connect(conn_params)
cur = conn.cursor()

try:
    # Выполняем SQL-запрос
    cur.execute(create_materialized_view_of_general_data_query)

    # Фиксируем изменения
    conn.commit()
    print("Материализованное представление успешно создано.")

except Exception as e:
    print(f"Произошла ошибка: {e}")

finally:
    # Закрываем курсор и соединение
    if cur:
        cur.close()
    if conn:
        conn.close()

Материализованное представление успешно создано.


### 3.5. Создание материализованного представления с возрастом ТС по регионам

Запрос на создание материализованного представления создает таблицу содержащую:

* Регион (группировка)
* Год (группировка)
* Средний возраст ТС

In [23]:
create_materialized_view_of_vehicle_age_by_regions_query = """
CREATE MATERIALIZED VIEW vehicle_age_by_regions AS
SELECT 
		DATE_TRUNC('year', datetime) AS "year", 
		parent_region, 
		AVG(age*1.0) AS avg_age
FROM public.vehicles_data
GROUP BY DATE_TRUNC('year', datetime), parent_region;
"""

Выполнение запроса:

In [24]:
conn = psycopg2.connect(conn_params)
cur = conn.cursor()

try:
    # Выполняем SQL-запрос
    cur.execute(create_materialized_view_of_vehicle_age_by_regions_query)

    # Фиксируем изменения
    conn.commit()
    print("Материализованное представление успешно создано.")

except Exception as e:
    print(f"Произошла ошибка: {e}")

finally:
    # Закрываем курсор и соединение
    if cur:
        cur.close()
    if conn:
        conn.close()

Материализованное представление успешно создано.


#### 3.5.1. Обновление материализованного представления с возрастом ТС по регионам

Запрос на обновление:

In [26]:
refresh_materialized_view_of_vehicle_age_by_regions_query = """
REFRESH MATERIALIZED VIEW vehicle_age_by_regions;
"""

Выполнение запроса:

In [27]:
conn = psycopg2.connect(conn_params)
cur = conn.cursor()

try:
    # Выполняем SQL-запрос
    cur.execute(refresh_materialized_view_of_vehicle_age_by_regions_query)

    # Фиксируем изменения
    conn.commit()
    print("Материализованное представление успешно обновлено.")

except Exception as e:
    print(f"Произошла ошибка: {e}")

finally:
    # Закрываем курсор и соединение
    if cur:
        cur.close()
    if conn:
        conn.close()

Материализованное представление успешно обновлено.


### 3.6. Создание материализованного представления с количеством конкретных нарушений в разбивке по регионам

Запрос на создание материализованного представления создает таблицу содержащую:

* Регион (группировка)
* Год (группировка)
* Категория нарушения (группировка)
* Количество нарушений

In [42]:
create_materialized_view_of_violations_count_by_regions_query = """
CREATE MATERIALIZED VIEW violations_agregated_by_region AS
SELECT 
	  DATE_TRUNC('year', datetime) AS "year",
	  parent_region,
	  violations,
	  COUNT(id) AS violations_count
FROM public.participants_data
WHERE violations <> 'unknown'
GROUP BY DATE_TRUNC('year', datetime), parent_region, violations;
"""

Выполнение запроса:

In [43]:
conn = psycopg2.connect(conn_params)
cur = conn.cursor()

try:
    # Выполняем SQL-запрос
    cur.execute(create_materialized_view_of_violations_count_by_regions_query)

    # Фиксируем изменения
    conn.commit()
    print("Материализованное представление успешно создано.")

except Exception as e:
    print(f"Произошла ошибка: {e}")

finally:
    # Закрываем курсор и соединение
    if cur:
        cur.close()
    if conn:
        conn.close()

Материализованное представление успешно создано.


### 3.6. Создание таблицы со справочником координат и геополигонов регионов РФ

Запрос на создание таблицы:

In [42]:
# Создание таблицы (если не существует)
create_table_query = '''
CREATE TABLE IF NOT EXISTS ru_regions (
    name VARCHAR(100),
    type VARCHAR(50),
    id INTEGER,
    region VARCHAR(100),
    coords_type	VARCHAR,
    coords TEXT
);
'''
cur.execute(create_table_query)
conn.commit()

Загрузка данных в таблицу:

In [43]:
# Вставка данных
for index, row in df.iterrows():
    cur.execute(
        "INSERT INTO ru_regions (name, type, id, region, coords_type, coords) VALUES (%s, %s, %s, %s, %s, %s)",
        (row['name'], row['type'], row['id'], row['region'], row['coords_type'], row['coords'])
    )

# Зафиксировать изменения
conn.commit()

# Закрытие соединения
cur.close()
conn.close()

print("Данные успешно перенесены в базу данных PostgreSQL.")

Данные успешно перенесены в базу данных PostgreSQL.


### 3.7. Создание таблицы со справочником населения регионов РФ

Запрос на создание таблицы:

In [62]:
# Создание таблицы (если не существует)
create_table_query = '''
CREATE TABLE IF NOT EXISTS regions_population (
    region VARCHAR(100),
    federal_region VARCHAR(50),
    year INTEGER,
    population INTEGER
);
'''
cur.execute(create_table_query)
conn.commit()

Загрузка данных в таблицу:

In [98]:
# Вставка данных
for index, row in flat_table.iterrows():
    cur.execute(
        "INSERT INTO regions_population (region, federal_region, year, population) VALUES (%s, %s, %s, %s)",
        (row['region'], row['federal_region'], row['year'], row['population'])
    )

# Зафиксировать изменения
conn.commit()

# Закрытие соединения
cur.close()
conn.close()

print("Данные успешно перенесены в базу данных PostgreSQL.")

Данные успешно перенесены в базу данных PostgreSQL.
