In [2]:
import boto3
import os
from config import ACCESS_KEY, SECRET_KEY, TOKEN
from collections import defaultdict
import datetime
import pytz

import pandas as pd
pd.set_option('display.max_colwidth', None)

token = TOKEN
headers={'Authorization':token,'Accept':'application/json'}

#'b1gb310irjlk6b99e14g' - аналитика
#'b1gc7vi2ckqausoc5dr7' - спутник

FOLDER_ID = 'b1gc7vi2ckqausoc5dr7' # id каталога из которого береться запрос
ACCESS_KEY = ACCESS_KEY #aws_access_key_id для S3
SECRET_KEY = SECRET_KEY #aws_secret_access_key в s3

In [4]:
def get_s3_instance():
    session = boto3.session.Session()
    return session.client(
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY,
        service_name='s3',
        endpoint_url='https://storage.yandexcloud.net'
    )

# Если нужны все файлы, то не прописываем Prefix то Prefix=False
def list_of_daily_objects_from_s3(bucket_name, Prefix):
    s3 = get_s3_instance()
    # Создаем пагинатор. Он нужен на тот случай, если файлов больше 1000
    paginator = s3.get_paginator('list_objects_v2')

    # Присваеваем пагинатор с параметрами. 
    if folder is False:
        page_iterator = paginator.paginate(Bucket=bucket_name)
    else:    
        page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=Prefix)

    df_obj = pd.DataFrame({
        'folder': [],
        'year': [],
        'month': [],
        'day': [],
        'key':[],
        'LastModified':[],
        }) # создаем пустой датасет для анализа

    df_obj = df_obj.astype(str)

    obj_number = 0 
    for page in page_iterator:
        for obj in page['Contents']: # list_objects - список объектов в бакете, Prefix - поиск во ключевому слову
            if len(obj['Key'].split(sep='/')) == 4 \
            and obj['Key'].split(sep='/')[3]!='':   # условия фильтрации
                df_obj.loc[obj_number,['folder']] = obj['Key'].split(sep='/')[0]
                df_obj.loc[obj_number,['year']] = obj['Key'].split(sep='/')[1].replace('year=','')
                df_obj.loc[obj_number,['month']] = obj['Key'].split(sep='/')[2].replace('month=','')
                df_obj.loc[obj_number,['day']] = obj['Key'].split(sep='/')[3].replace('.csv','')
                df_obj.loc[obj_number,['key']] = obj['Key']
                df_obj.loc[obj_number,['LastModified']] = obj['LastModified'].strftime('%Y-%m-%d %H:%M:%S',)
            obj_number += 1 
                # добавляем в датасет найденные значения
                
    df_obj['date'] = df_obj['year']+'-'+df_obj['month']+'-'+df_obj['day']
    df_obj['date'] = pd.to_datetime(df_obj['date'], dayfirst=False)

    df_obj = df_obj.reset_index().drop('index', axis=1)
    return df_obj


def list_of_missing_data(df_obj):
    s3 = get_s3_instance()
    # задаем стартовую и финишную дату на основе ключей
    start_date = df_obj.loc[0,['date']].values[0]
    end_date = df_obj.loc[df_obj.shape[0]-1,['date']].values[0]

    # создаем датасет с всеми датами
    dates_pd = pd.DataFrame({
        'date_range': pd.date_range(start=start_date, end=end_date),
        })

    # соединяем ранее полученный список и полный список дат
    dates_merged = dates_pd.merge(
        df_obj,
        left_on='date_range',
        right_on='date',
        how='left'
    )

    # фильтруем полученный список по null строкам и выносим даты в отедльный датафрейм
    df_of_missing_dates = pd.DataFrame()
    df_of_missing_dates['date'] = dates_merged[dates_merged['key'].isnull()]['date_range'] 
    df_of_missing_dates = df_of_missing_dates.reset_index().drop('index', axis=1)

    list_of_missing_dates = []

    # Подставляем под пропущенные даты ключ файла, из которого будет браться информация
    for i in range(df_of_missing_dates.shape[0]):

        next_date_day = int((df_of_missing_dates.loc[i,['date']] + datetime.timedelta(days=1)).iloc[0].strftime('%d'))
        next_date_month = (df_of_missing_dates.loc[i,['date']] + datetime.timedelta(days=1)).iloc[0].strftime('%m')
        next_date_year = (df_of_missing_dates.loc[i,['date']] + datetime.timedelta(days=1)).iloc[0].strftime('%Y')
        s3_file_name = f'{folder}/year={next_date_year}/month={next_date_month}/{next_date_day}.csv'
        try: 
            if s3.head_object(Bucket=bucket_name,Key=s3_file_name)['ResponseMetadata']['HTTPStatusCode'] == 200:
                df_of_missing_dates.loc[i,['next file key']] = s3_file_name
        except:
            continue
            
    # заполняем оставшиеся пропуски следующим не пустым значением
    # эта таблица  поможет выполнить дальнейшие действия
    df_of_missing_dates = df_of_missing_dates.bfill()
    return df_of_missing_dates


# В качестве ключей для скачивания используем ключи всех файлов из папки, полученные ранее

def downloading_missing_files_from_s3(path_on_pc,df_of_missing_dates):
    s3 = get_s3_instance()
    for i in range(0,df_of_missing_dates.shape[0]):
        s3_file_name = df_of_missing_dates.loc[i,['next file key']].values[0]
        s3_file_name_folder = '/'.join(s3_file_name.split(sep='/')[0:3])
        
        local_download_folder = f'{path_on_pc}/{s3_file_name_folder}'
        local_download_path = f'{path_on_pc}/{s3_file_name}' 
        
    # если данный файл уже скачан, он не скачивается повторно
        if os.path.isfile(local_download_path) is False:
        # создается новая папка, если ее нет
            os.makedirs(local_download_folder, exist_ok=True)
            s3.download_file(Bucket=bucket_name,Key=s3_file_name,Filename=local_download_path)


def adding_missing_data_to_pc(folder, path_on_pc, df_of_missing_dates):
    # проходимся по датафрейму пропущенных значений
    for i in range(df_of_missing_dates.shape[0]):
        new_date = df_of_missing_dates.loc[i,["date"]].values[0].strftime('%Y-%m-%d')
        df_of_missing_dates.loc[i,["date"]].values[0].strftime('year=%Y/month=%m/%d.csv')
        temp_df = pd.read_csv(f'{path_on_pc}/{df_of_missing_dates.loc[i,["next file key"]].values[0]}')
        # Обновляем дату
        temp_df['report_date'] = new_date
        temp_df.to_csv(f"{path_on_pc}/{folder}/{df_of_missing_dates.loc[i,['date']].values[0].strftime('year=%Y/month=%m/%#d.csv')}" ,sep=',',index=False)
        # Создаем внутри датасета отдельный список ключей для загрузки
        df_of_missing_dates.loc[i,['new_key']] = f"{folder}/{df_of_missing_dates.loc[i,['date']].values[0].strftime('year=%Y/month=%m/%#d.csv')}"


def upploading_missing_data_to_storage(bucket_name,path_on_pc):
    s3 = get_s3_instance()
    for i in range(0,df_of_missing_dates.shape[0]):
        s3_file_path = df_of_missing_dates.loc[i,['new_key']].values[0]
        pc_file_path = f'{path_on_pc}/{s3_file_path}'
        try:
            try:
                if s3.head_object(Bucket=bucket_name,Key=s3_file_path)['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(f'{s3_file_path} the file has already been uploaded!')
            except:
                s3.upload_file(pc_file_path, bucket_name, s3_file_path)
                print(f'{s3_file_path} has been uploaded')
        except Exception as e:
            print(e)
            continue


In [5]:
bucket_name = 'dwh-asgard' # имя бакета
s3 = get_s3_instance()

folder = 'cameras_st_asgard' # имя папки в бакете
Prefix = f"{folder}/"
df_obj = list_of_daily_objects_from_s3(bucket_name,Prefix)

df_of_missing_dates =  list_of_missing_data(df_obj)

path_on_pc = 'D:/s3' 
downloading_missing_files_from_s3(path_on_pc,df_of_missing_dates)
adding_missing_data_to_pc(folder, path_on_pc, df_of_missing_dates)
upploading_missing_data_to_storage(bucket_name,path_on_pc)

cameras_st_asgard/year=2024/month=08/2.csv has been uploaded
cameras_st_asgard/year=2024/month=08/3.csv has been uploaded
cameras_st_asgard/year=2024/month=08/4.csv has been uploaded
