In [4]:
import requests
import pandas as pd
import scrapy as sc

from pathlib import Path
import os
import zipfile

import time
import re

from clickhouse_driver import Client

from loguru import logger

#Перманентно поменял настройки вывода графиков
from matplotlib import pylab
from pylab import *
pylab.rcParams['figure.figsize'] = (18.0, 6.0)
plt.rcParams.update({'font.size': 13})

#Скрыл вывод предупреждений.
import warnings
warnings.filterwarnings('ignore') #чтобы вернуть: (action='once')

# Глобально снял ограничение на кол-во отображаемых результатов для каждой ячейки ввода кода.
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Включил возможность форматировать стили текста с помощью метода printmd()
from IPython.display import Markdown, display, HTML
def printmd(string):
    display(Markdown(string))
    
# Снял ограничение на вывод кол-ва столбцов и ширины колонки.
pd.set_option('display.max_columns', 0)
pd.set_option('display.max_colwidth', -1)

# Установил формат вывода в таблице на 2 знака после запятой.
pd.options.display.float_format = '{:,.3f}'.format
    
# Добавил функцию вывода таблиц в одну строку, для экономии пространства и улучшения восприятия информации.
def display_side_by_side(dfs:list, captions:list):
    output = ""
    combined = dict(zip(captions, dfs))
    for caption, df in combined.items():
        output += df.style.set_table_attributes("style='display:inline'").set_caption(caption)._repr_html_()
        output += "\xa0\xa0\xa0"
    display(HTML(output))

In [2]:
#Настроил логер
logger.add('logs/logs.log', format="{time} {level} {message}", filter="my_module", level="INFO")

1

### Extract

In [7]:
#Сделал запрос на скачивание xml-файла из s3 бакета со списком всех zip архивов.
test_req = requests.get('https://s3.amazonaws.com/tripdata')

#Записал файл в pandas датафрейм.
tt = pd.read_xml(test_req.text)

In [9]:
#Отсеял все поля, которые не содержат значения Key(название zip-файла)
table_source = tt[tt['Key'] == tt['Key']].iloc[:, 3:-1].reset_index(drop=True)[:-1]
table_source.head(3)

Unnamed: 0,Key,LastModified,ETag,Size
0,201306-citibike-tripdata.zip,2018-04-30T13:18:55.000Z,"""b520a12de58eea58a3586f89bfcfbd9d-2""",16785103.0
1,201307-201402-citibike-tripdata.zip,2017-01-18T22:23:25.000Z,"""7b3b260b2ab2e5349320121d04bd821c-22""",178262576.0
2,201307-citibike-tripdata.zip,2017-01-18T22:23:27.000Z,"""dd3e6fd5f91715b31eae72868086c08c-4""",27074629.0


In [5]:
#Создал колонку с url
table_source['url'] =  table_source['Key'].apply(lambda x: 'https://s3.amazonaws.com/tripdata/' + str(x))
table_source.head(3)

Unnamed: 0,Key,LastModified,ETag,Size,url
0,201306-citibike-tripdata.zip,2018-04-30T13:18:55.000Z,"""b520a12de58eea58a3586f89bfcfbd9d-2""",16785103.0,https://s3.amazonaws.com/tripdata/201306-citibike-tripdata.zip
1,201307-201402-citibike-tripdata.zip,2017-01-18T22:23:25.000Z,"""7b3b260b2ab2e5349320121d04bd821c-22""",178262576.0,https://s3.amazonaws.com/tripdata/201307-201402-citibike-tripdata.zip
2,201307-citibike-tripdata.zip,2017-01-18T22:23:27.000Z,"""dd3e6fd5f91715b31eae72868086c08c-4""",27074629.0,https://s3.amazonaws.com/tripdata/201307-citibike-tripdata.zip


In [11]:
logger.info('Сравнение таблицы исходника с актуальной версией')
#Файл etl_log.csv хранит актуальную историю последней выгрузки. 

#Если файл уже существует, то скрипт открывает его и удаляет колонку update
if os.path.exists('etl_log.csv'):
    logger.info('Таблица загрузчика существует.')
    
    table_actual = pd.read_csv('etl_log.csv',sep=";")
    del table_actual['update'] #колонка update указывает следует ли загружать файл
    
    #Объединяются 2 таблицы для построчного сравнения. В результате создается колонка обноваленная колонка update
    table_final=table_source.merge(table_actual,indicator=True,how='left')
    table_final._merge=table_final._merge.eq('both')
    table_final = table_final.rename(columns={'_merge':'update'})
    table_final['update'] = ~table_final['update']

    #Подсчитывается количество изменений и выводиться соответствующее сообщение.
    changes = table_final[table_final['update'] != False]
    
    if len(changes) == 0:
        logger.info('Изменений нет.')
    else:
        logger.info(len(changes) + 'изменений найдено.')
#Если файл не существует, то создает его с указанными по умолчанию значениями.
else:
    table_source['downloaded'] = False #метка был ли скачан файл.
    table_source['extracted'] = False #метка были ли разархивирован файл.
    table_source['ch_uploaded'] = False #метка был ли файл загружен в БД clickhouse.
    table_source['csv'] = 'No' #название csv файла при его существовании.
    
    table_source.to_csv('etl_log.csv',sep=";",index=False)
    
    logger.info('Файл таблицы создан.')

2022-09-14 23:52:28.771 | INFO     | __main__:<cell line: 1>:1 - Сравнение таблицы исходника с актуальной версией
2022-09-14 23:52:28.775 | INFO     | __main__:<cell line: 5>:6 - Таблица загрузчика существует.
2022-09-14 23:52:28.804 | INFO     | __main__:<cell line: 5>:18 - Изменений нет.


In [15]:
table_final.head(3)

Unnamed: 0,Key,LastModified,ETag,Size,url,downloaded,extracted,ch_uploaded,csv,update
0,201306-citibike-tripdata.zip,2018-04-30T13:18:55.000Z,"""b520a12de58eea58a3586f89bfcfbd9d-2""",16785103.0,https://s3.amazonaws.com/tripdata/201306-citibike-tripdata.zip,True,True,True,201306-citibike-tripdata.csv,False
1,201307-201402-citibike-tripdata.zip,2017-01-18T22:23:25.000Z,"""7b3b260b2ab2e5349320121d04bd821c-22""",178262576.0,https://s3.amazonaws.com/tripdata/201307-201402-citibike-tripdata.zip,True,True,True,2013-07 - Citi Bike trip data.csv,False
2,201307-citibike-tripdata.zip,2017-01-18T22:23:27.000Z,"""dd3e6fd5f91715b31eae72868086c08c-4""",27074629.0,https://s3.amazonaws.com/tripdata/201307-citibike-tripdata.zip,True,True,True,2013-07 - Citi Bike trip data.csv,False


In [22]:
# Методы сохранения файлов
def get_file_file(url): #делаем запрос на сам файл
    try:
        r = requests.get(url, stream=True)
        return r
    except:
        print('Нет коннекта при соединении с файлом по указанной ссылке.')
        logger.info('Нет коннекта при соединении с файлом по указанной ссылке.')
        pass


def get_file_name(url): #получаем имя файла
    try:
        name = url.split('/')[-1]  #потрошим ссылку через / и берём оттуда последние данные
        return name
    except:
        return url

def save_file(name, file_object): #сохраняет файл в корневой папке
    try:
        path = './files/'+name
        with open(path, 'bw') as f:
            for chunk in file_object.iter_content(None):
                f.write(chunk)
            print(str(name)+' загружен')
            logger.info(str(name)+' загружен')
    except:
        pass
    time.sleep(5)

In [23]:
# Директория сохранения файлов
dir_path = r'./files/'

# Список файлов в директории
res = os.listdir(dir_path)

In [25]:
#Функция извлекает csv-файл из архива в путь ./files/csv/, принимает атрибуты: имя файла, индекс.
def zip_extractor(target_file, index):
    endswith = '.csv'
    try:
        path = Path(target_file)
        target_file_size = path.stat().st_size
        
        with zipfile.ZipFile(target_file) as z:
            for file in z.namelist():
                if file.endswith(endswith):
                    if (target_file_size) > 0:
                        logger.info("Извлекаемый из архива файл существует: "+file)
                    else:
                        z.extract(file,'./files/csv/')
                logger.info(file+" файл разархивирован")
                if file[1] != "_":
                    table_final.at[index, 'csv'] = file
        table_final.at[index, 'extracted'] = True
    except ValueError as ve:
        logger.info(ve)

In [33]:
#Итератор проходит по созданной ранее таблице с актуальной версией инструкций экстракции. 
for index, row in table_final.iterrows():
    file = row['url']
    filename = file.split('/')[-1]
    try:
        path = Path(dir_path+filename)
        target_file_size = path.stat().st_size
    except:
        logger.info("Файл скачивается: "+file)
        save_file(get_file_name(file), get_file_file(file))
    else:
        if (target_file_size) > 0:
            logger.info("Файл существует: "+filename)
        else:
            logger.info("Файл скачан не полностью, продолжается скачивание: "+file)
            save_file(get_file_name(file), get_file_file(file))
            
    table_final.at[index, 'downloaded'] = True
    zip_extractor('.\\'+str(path),index)

2022-09-08 23:57:01.695 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201306-citibike-tripdata.zip
2022-09-08 23:57:01.696 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201306-citibike-tripdata.csv
2022-09-08 23:57:01.697 | INFO     | __main__:zip_extractor:14 - 201306-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.697 | INFO     | __main__:zip_extractor:14 - __MACOSX/ файл разархивирован
2022-09-08 23:57:01.698 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: __MACOSX/._201306-citibike-tripdata.csv
2022-09-08 23:57:01.698 | INFO     | __main__:zip_extractor:14 - __MACOSX/._201306-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.699 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201307-201402-citibike-tripdata.zip
2022-09-08 23:57:01.701 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 2014-02 - Citi Bike trip data.csv
2022-09-08 23:57:01.

2022-09-08 23:57:01.755 | INFO     | __main__:zip_extractor:14 - 201409-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.756 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201410-citibike-tripdata.zip
2022-09-08 23:57:01.757 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201410-citibike-tripdata.csv
2022-09-08 23:57:01.757 | INFO     | __main__:zip_extractor:14 - 201410-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.759 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201411-citibike-tripdata.zip
2022-09-08 23:57:01.760 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201411-citibike-tripdata.csv
2022-09-08 23:57:01.760 | INFO     | __main__:zip_extractor:14 - 201411-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.761 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201412-citibike-tripdata.zip
2022-09-08 23:57:01.762 | INFO     | __main__:zip_extrac

2022-09-08 23:57:01.811 | INFO     | __main__:zip_extractor:14 - 201608-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.812 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201609-citibike-tripdata.zip
2022-09-08 23:57:01.813 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201609-citibike-tripdata.csv
2022-09-08 23:57:01.814 | INFO     | __main__:zip_extractor:14 - 201609-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.815 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201610-citibike-tripdata.zip
2022-09-08 23:57:01.816 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201610-citibike-tripdata.csv
2022-09-08 23:57:01.816 | INFO     | __main__:zip_extractor:14 - 201610-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.817 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201611-citibike-tripdata.zip
2022-09-08 23:57:01.818 | INFO     | __main__:zip_extrac

2022-09-08 23:57:01.867 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201807-citibike-tripdata.csv
2022-09-08 23:57:01.868 | INFO     | __main__:zip_extractor:14 - 201807-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.869 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201808-citibike-tripdata.csv.zip
2022-09-08 23:57:01.870 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201808-citibike-tripdata.csv
2022-09-08 23:57:01.871 | INFO     | __main__:zip_extractor:14 - 201808-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.871 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 201809-citibike-tripdata.csv.zip
2022-09-08 23:57:01.872 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 201809-citibike-tripdata.csv
2022-09-08 23:57:01.873 | INFO     | __main__:zip_extractor:14 - 201809-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.874 | I

2022-09-08 23:57:01.925 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 202005-citibike-tripdata.csv.zip
2022-09-08 23:57:01.927 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 202005-citibike-tripdata.csv
2022-09-08 23:57:01.927 | INFO     | __main__:zip_extractor:14 - 202005-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.928 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 202006-citibike-tripdata.csv.zip
2022-09-08 23:57:01.929 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 202006-citibike-tripdata.csv
2022-09-08 23:57:01.929 | INFO     | __main__:zip_extractor:14 - 202006-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.930 | INFO     | __main__:zip_extractor:14 - __MACOSX/ файл разархивирован
2022-09-08 23:57:01.930 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: __MACOSX/._202006-citibike-tripdata.csv
2022-09-08 23:57:01.931 | INFO     

2022-09-08 23:57:01.974 | INFO     | __main__:zip_extractor:14 - __MACOSX/._202108-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.975 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 202109-citibike-tripdata.csv.zip
2022-09-08 23:57:01.976 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 202109-citibike-tripdata.csv
2022-09-08 23:57:01.977 | INFO     | __main__:zip_extractor:14 - 202109-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.977 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: __MACOSX/._202109-citibike-tripdata.csv
2022-09-08 23:57:01.978 | INFO     | __main__:zip_extractor:14 - __MACOSX/._202109-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:01.979 | INFO     | __main__:<cell line: 1>:12 - Файл существует: 202110-citibike-tripdata.csv.zip
2022-09-08 23:57:01.980 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: 202110-citibike-tripdat

2022-09-08 23:57:02.022 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-201512-citibike-tripdata.csv
2022-09-08 23:57:02.023 | INFO     | __main__:zip_extractor:14 - JC-201512-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.024 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201601-citibike-tripdata.csv.zip
2022-09-08 23:57:02.025 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-20161-citibike-tripdata.csv
2022-09-08 23:57:02.025 | INFO     | __main__:zip_extractor:14 - JC-20161-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.026 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201602-citibike-tripdata.csv.zip
2022-09-08 23:57:02.027 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-20162-citibike-tripdata.csv
2022-09-08 23:57:02.028 | INFO     | __main__:zip_extractor:14 - JC-20162-citibike-tripdata.csv файл разархивирован
2022-09

2022-09-08 23:57:02.082 | INFO     | __main__:zip_extractor:14 - JC-201710-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.083 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201711-citibike-tripdata.csv.zip
2022-09-08 23:57:02.084 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-201711-citibike-tripdata.csv
2022-09-08 23:57:02.084 | INFO     | __main__:zip_extractor:14 - JC-201711-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.085 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201712-citibike-tripdata.csv.zip
2022-09-08 23:57:02.086 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-201712-citibike-tripdata.csv
2022-09-08 23:57:02.086 | INFO     | __main__:zip_extractor:14 - JC-201712-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.087 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201801-citibike-tripdata.csv.zip
2022-09-08 23:57:02.

2022-09-08 23:57:02.135 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201909-citibike-tripdata.csv.zip
2022-09-08 23:57:02.136 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-201909-citibike-tripdata.csv
2022-09-08 23:57:02.136 | INFO     | __main__:zip_extractor:14 - JC-201909-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.137 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201910-citibike-tripdata.csv.zip
2022-09-08 23:57:02.138 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-201910-citibike-tripdata.csv
2022-09-08 23:57:02.138 | INFO     | __main__:zip_extractor:14 - JC-201910-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.139 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-201911-citibike-tripdata.csv.zip
2022-09-08 23:57:02.141 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-201911-citibike-tripdata.csv
2

2022-09-08 23:57:02.183 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: __MACOSX/._JC-202104-citibike-tripdata.csv
2022-09-08 23:57:02.183 | INFO     | __main__:zip_extractor:14 - __MACOSX/._JC-202104-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.184 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-202105-citibike-tripdata.csv.zip
2022-09-08 23:57:02.185 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-202105-citibike-tripdata.csv
2022-09-08 23:57:02.186 | INFO     | __main__:zip_extractor:14 - JC-202105-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.186 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: __MACOSX/._JC-202105-citibike-tripdata.csv
2022-09-08 23:57:02.186 | INFO     | __main__:zip_extractor:14 - __MACOSX/._JC-202105-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.187 | INFO     | __main__:<cell line: 1>:12 - Файл существ

2022-09-08 23:57:02.227 | INFO     | __main__:zip_extractor:14 - JC-202205-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.228 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: __MACOSX/._JC-202205-citibike-tripdata.csv
2022-09-08 23:57:02.228 | INFO     | __main__:zip_extractor:14 - __MACOSX/._JC-202205-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.230 | INFO     | __main__:<cell line: 1>:12 - Файл существует: JC-202206-citibike-tripdata.csv.zip
2022-09-08 23:57:02.230 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: JC-202206-citibike-tripdata.csv
2022-09-08 23:57:02.231 | INFO     | __main__:zip_extractor:14 - JC-202206-citibike-tripdata.csv файл разархивирован
2022-09-08 23:57:02.231 | INFO     | __main__:zip_extractor:11 - Извлекаемый из архива файл существует: __MACOSX/._JC-202206-citibike-tripdata.csv
2022-09-08 23:57:02.232 | INFO     | __main__:zip_extractor:14 - __MACOSX/._JC-202206-cit

In [34]:
#Сохранение изменений инструкций экстракции.
table_final.to_csv('etl_log.csv',sep=";",index=False)

### Transform Load

In [37]:
#Функция просеивает данные из одного csv файла, группирует их по условию и загружает в БД в три разные таблицы. Принимает атрибуты: имя файла, индекс.
def load_trip(filename,index):
    try:
        #Читаем csv файл в Dataframe
        df = pd.read_csv('./files/csv/'+filename)
#         df.info()
        
        #Приводим колонки к одному виду
        df.columns = df.columns.str.replace(' ', '')
        df.columns = df.columns.str.lower()
        try:
            df['stop_date'] = pd.to_datetime(df['stoptime'])
        except:
            df = df.rename(columns={'ended_at':'stop_date','ride_id':'bikeid','':'tripduration','member_casual':'gender'})
            df['stop_date'] = pd.to_datetime(df['stop_date'])
            df['started_at'] = pd.to_datetime(df['started_at'])
            df['tripduration'] = (df['stop_date'] - df['started_at']) / np.timedelta64(1, 's')
            df = df.replace('member', 1).replace('casual', 2)
            pass
            
        #Преобразуем тип данных для работы со временем.
        df['stop_date'] = df['stop_date'].astype('datetime64[D]')
    #     df.head(5)
 
        #Группируем датафреймы по дням, следуя условиям задания.
        trips_count_daily = df.groupby('stop_date')['bikeid'].count().to_frame().reset_index().rename(columns={'bikeid':'trips_count','stop_date':'date'})
    #     trips_count_daily.head(10)
        avg_duration_daily = df.groupby('stop_date')['tripduration'].mean().to_frame().reset_index().rename(columns={'tripduration':'avg_trip_dur','stop_date':'date'})
    #     avg_duration_daily.head(10)
        gender_df = df.groupby('stop_date')['bikeid'].count().to_frame()
        

        for sex in df['gender'].unique():
            gender_df['gender_'+str(sex)] = df[df['gender'] == sex].groupby('stop_date')['gender'].count().to_frame()
        gender_daily = gender_df.reset_index().rename(columns={'stop_date':'date'}) 
        del gender_daily['bikeid']
        for sex in df['gender'].unique():
            gender_daily['gender_'+str(sex)] = gender_daily['gender_'+str(sex)].fillna(0).astype(int)  
        if 'gender_0' not in gender_daily.columns:
            gender_daily['gender_0'] = 0
    #     gender_daily.head(5)

        #Создаем коннект к Clickhouse. В clickhouse. Файлы DDL в папке проекта. Для таблиц использовались движки ReplacingMergeTree и AggregatingMergeTree
        client = Client(host='localhost', settings={'use_numpy': True}).from_url('clickhouse://default:@localhost:19000/default') #логин пароль порт по умолчанию, дефолтная БД тоже по умолчанию
    #     result = client.execute("SHOW TABLES")
    #     print(result)

        client.execute('INSERT INTO `default`.gender_daily VALUES', gender_daily.to_dict('records'),types_check=True)
        client.execute('OPTIMIZE TABLE `default`.gender_daily')
    #     client.query_dataframe('SELECT * FROM gender_daily')

        client.execute('INSERT INTO `default`.trips_count_daily VALUES', trips_count_daily.to_dict('records'),types_check=True)
        client.execute('OPTIMIZE TABLE `default`.trips_count_daily')
    #     client.query_dataframe('SELECT * FROM trips_count_daily')

        client.execute('INSERT INTO `default`.avg_duration_daily VALUES', avg_duration_daily.to_dict('records'),types_check=True)
        client.execute('OPTIMIZE TABLE `default`.avg_duration_daily')
    # client.query_dataframe('SELECT * FROM avg_duration_daily')
        table_final.at[index, 'ch_uploaded'] = True
        logger.info('Файл успешно загружен в БД: ' + filename)
    except ValueError as ve:
        logger.info(ve)

In [38]:
#Итератор выполняют функцию load_trip, тем самым обрабатывает и загружает подготовленные данные в БД.
for index, row in table_final.iterrows():
    uploaded = row['ch_uploaded']
    if uploaded != True:
        load_trip(row['csv'],index)
    else:
        logger.info('Датасет уже существует в БД: ' + row['csv'])

2022-09-08 23:57:41.139 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 201306-citibike-tripdata.csv
2022-09-08 23:57:41.140 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 2013-07 - Citi Bike trip data.csv
2022-09-08 23:57:41.141 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 2013-07 - Citi Bike trip data.csv
2022-09-08 23:57:41.142 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 2013-08 - Citi Bike trip data.csv
2022-09-08 23:57:41.142 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 2013-09 - Citi Bike trip data.csv
2022-09-08 23:57:41.143 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 2013-10 - Citi Bike trip data.csv
2022-09-08 23:57:41.143 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 2013-11 - Citi Bike trip data.csv
2022-09-08 23:57:41.144 | INFO     | __main__:<cell line: 1>:6 - Датасет уже существует в БД: 2013-12 - Citi 

2022-09-09 00:12:01.996 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201901-citibike-tripdata.csv
2022-09-09 00:12:04.742 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201902-citibike-tripdata.csv
2022-09-09 00:12:08.737 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201903-citibike-tripdata.csv
2022-09-09 00:12:13.942 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201904-citibike-tripdata.csv
2022-09-09 00:12:19.651 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201905-citibike-tripdata.csv
2022-09-09 00:12:26.006 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201906-citibike-tripdata.csv
2022-09-09 00:12:32.475 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201907-citibike-tripdata.csv
2022-09-09 00:12:39.419 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: 201908-citibike-tripdata.csv
2022-09-09 00:12:46.510 | INFO     | __main__:load_trip:

2022-09-09 00:17:32.730 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201711-citibike-tripdata.csv
2022-09-09 00:17:32.822 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201712-citibike-tripdata.csv
2022-09-09 00:17:32.916 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201801-citibike-tripdata.csv
2022-09-09 00:17:33.011 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201802-citibike-tripdata.csv
2022-09-09 00:17:33.106 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201803-citibike-tripdata.csv
2022-09-09 00:17:33.212 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201804-citibike-tripdata.csv
2022-09-09 00:17:33.356 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201805-citibike-tripdata.csv
2022-09-09 00:17:33.529 | INFO     | __main__:load_trip:53 - Файл успешно загружен в БД: JC-201806-citibike-tripdata.csv
2022-09-09 00:17:33.714 | INFO  

In [39]:
#Сохранение изменений инструкций экстракции.
table_final.to_csv('etl_log.csv',sep=";",index=False)