In [1]:
import os
import numpy as np
import pandas as pd
import re
from math import isnan
from datetime import datetime
#
from google.api_core import exceptions

In [2]:
# Import custom classes for load and transform
from Reporter import Reporter
from Preprocessing import Preprocessing

Reporter - дополнительный класс, инцилизируется с помощью
- **reporter_parms**  
  `start_date`, `end_date`  - интервал, в котором скачиваются отчеты  
  `name` - название типа отчетов \\ TODO: чтобы был листом  
  `id` - идентификатор МП в appstore  
  `path_dist` - путь, куда грузятся отчеты (для удобства совпадает с base_dir)
  
После инцилизации есть 2 метода:
- extract():
Реализует выгрузку отчетов, согласно тому, как это было задано параметрами, можно проверить методом .show()
- update_dates(dates):
Обновляет даты в конфигурации

dates - формата list ([]), в котором даты должны быть 'YYYYMMDD'  
Например dates = ['20231231','20240101']

- show():

Preprocessing - дополнительный класс для обработки полей в отчетах от AppStore Connect 

# Параметры выгрузки
- **base_dir**  
Путь, в котором будут сложены отчеты (.csv)
- **appstore_id**   
Идентификатор МП в appstore
- **reporter_parms**  
  `start_date`, `end_date`  - интервал, в котором скачиваются отчеты  
  `name` - название типа отчетов \\ TODO: чтобы был листом  
  `id` - идентификатор МП в appstore  
  `path_dist` - путь, куда грузятся отчеты (для удобства совпадает с base_dir)  
- **project_id**  
 Название проекта в Big Query 
- **dataset_id**  
 Название датасета внутри проекта

- **credentials_json**  
 Путь к json (для аутентификации в сервисах GCP)


In [3]:
# Path in OS for download with PC owner script
base_dir = 'C:\\Users\\PC\\Desktop\\Notebooks\\datasets\\Apple Appstore connect data'
# AppStore identifies each mobile app as id, for example 12345678 (size = 8)
appstore_id = 89936618
reporter_parms = {'start_date': '2024/03/22',
                  'end_date': '2024/03/25',
                  'name': ' ',
                  'id': appstore_id,
                  'path_dst': base_dir}
# Big_Query Destination -> Structure: Project_id.Dataset_id.Table_name_id
# Project name in Big Query 
project_id = 'mapsme-81e63'
# Dataset name in Big Query
dataset_id = 'appstore_connect_data'
# Credentials for Auth in GCP 
credentials_json = '..\\..\\credentials\\big_query_admin.json'

# Выгрузка

In [10]:
update = Reporter(**reporter_parms)

In [11]:
# print(update.show())

Типы отчетов ['SubscriptionEvent', 'Subscriber'], c 20240322 по 20240325, в директорию - C:\Users\PC\Desktop\Notebooks\datasets\Apple Appstore connect data


In [None]:
# subscribers = Reporter(**reporter_parms)
# subscribers.extract()

In [None]:
# reporter_parms['name'] = 'SubscriptionEvent'
# events = Reporter(**reporter_parms)
# events.extract()

In [None]:
# update.extract()

# Предобработка

In [4]:
all_filenames = os.listdir(base_dir)

In [None]:
#Вывести список файлов по пути base_dir
#all_filenames

# Preprocessing data

In [6]:
def check_csv(table):
    if (((table['Event_Date'] == ' ') | (table['Event_Date'].isnull())).sum() != 0):
        raise EmptyDate('Колонка с `Event Date` без даты')
    if (table["Standard_Subscription_Duration"].isnull().sum() != 0 ):
        raise EmptyString('Недостает значение в `Standard Subscription Duration`')

In [7]:
%%time
dfs = {}
for file in all_filenames:
    path = os.path.join(base_dir,file)
    df = Preprocessing(path).get_data()
    try:
        check_csv(df)
    except:
        print(f'{file} сломан, предобработка не справилась с пропусками, поэтому загрузка невозможна')
        dfs.clear()
        break
    name_table = str(file[:-4:1].replace(r'_'+ str(appstore_id),'')[:-5:])
    if re.match(r'^(Subscriber|Subscription_Event)_202[3-9]\d{4}$', name_table):
        table_suffix = re.search('\d{8}',name_table).group(0)
        dfs.update(
                  {
                    name_table:
                    {'frame':df,
                     'table_suffix': table_suffix}
                   }
                  )
    else:
        print(f'По пути {path} неподходящее название файла')

CPU times: total: 15.6 ms
Wall time: 195 ms


In [8]:
dfs

{'Subscriber_20240322': {'frame':     Event_Date                        App_Name  App_Apple_ID  \
  0   2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  1   2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  2   2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  3   2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  4   2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  ..         ...                             ...           ...   
  642 2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  643 2024-03-23  MAPS.ME: Offline Maps, GPS Nav     510623322   
  644 2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  645 2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  646 2024-03-22  MAPS.ME: Offline Maps, GPS Nav     510623322   
  
                      Subscription_Name  Subscription_Apple_ID  \
  0                       PRO 2 Monthly             6449304586   
  1                       PRO 2 Monthly  

# Load data to BigQuery

## Connect

In [None]:
from google.oauth2.service_account import Credentials
from google.cloud import bigquery as bq

In [None]:
load_config_parms = {
                     'autodetect': True,
                     'create_disposition':'CREATE_IF_NEEDED',
                     'destination_table_description' : 'Data load by @dki script'
                    }

In [None]:
# 1
job_config = bq.LoadJobConfig(**load_config_parms)
credentials = Credentials.from_service_account_file(credentials_json) # создание объекта с кредо
client = bq.Client(credentials=credentials,
                   project=project_id,
                   default_load_job_config=job_config)
# 2
subscriber_schema = client.schema_from_json('schema/subscriber.json')
subcription_event_schema = client.schema_from_json('schema/subcription_event.json')

In [None]:
from datetime import datetime
# Последний Table_suffix
query_job = client.query(f''' 
SELECT ANY_VALUE (_TABLE_SUFFIX HAVING MAX PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) ) AS last_date 
FROM  `{dataset_id}.Subscriber_*`
UNION ALL 
SELECT ANY_VALUE (_TABLE_SUFFIX HAVING MAX PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) ) AS last_date 
FROM  `{dataset_id}.Subscription_Event_*`''')
try:
    dates = query_job.to_dataframe().rename(index={0:'Subscriber_*',1:'Subscription_Event_*'})
    print(dates)
    dates_dict = {'Subscriber':dates.loc['Subscriber_*','last_date'],
                 'Subscription_Event': dates.loc['Subscription_Event_*','last_date']}
except exceptions.BadRequest as exc:
    print(exc)


## Load

In [None]:
def is_there_in_wildcards(name_table,table_suffix):
    """
    name_table, 
    table_suffix

    
    """
    if(re.match('^Subscriber.+', name_table)):
        if datetime.strptime(table_suffix,'%Y%m%d')  <= datetime.strptime(dates_dict['Subscriber'],'%Y%m%d'):
                print(f'{name_table} имеет table_suffix раньше крайней даты, что есть среди BQ Wildcards')
                return False
        return True
    elif (re.match('^Subscription.+',name_table)):
        if datetime.strptime(table_suffix,'%Y%m%d') <= datetime.strptime(dates_dict['Subscription_Event'],'%Y%m%d'):
                print(f'{name_table} имеет table_suffix раньше крайней даты, что есть среди BQ Wildcards')
                return False
        return True
    else:
        print(f'{name_table} не подходит под формат `Subscriber_...` или `Subscription_Event_...`')
        return False
    
jobs = []



for name_table,data in dfs.items():
    if re.match('^Subscription_Event.+', name_table):
        job_config.schema = subcription_event_schema
    else:
        job_config.schema = subscriber_schema
    job = client.load_table_from_dataframe(dataframe=data['frame'],
                                          destination=f'{project_id}.{dataset_id}.{name_table}',
                                          job_config=job_config)


# Result 

In [None]:
help(client)

In [None]:
# Последний Table_suffix
query_job = client.query(f''' 
SELECT 
    table_suffix_date,
    DATE_DIFF(table_suffix_date, LAG(table_suffix_date) OVER (ORDER BY table_suffix_date), DAY) as date_diff
FROM (
    SELECT 
DISTINCT PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) AS table_suffix_date
FROM  `{dataset_id}.Subscription_Event_*`)
''')

In [None]:
all_tables = query_job.to_dataframe()

In [None]:
more_one_day = all_tables['date_diff'] > 1
all_tables[more_one_day]['table_suffix_date']