In [2]:
import os
import time
import json
import requests 
from datetime import datetime


def save_request(request_data, page_number, file_path='events_data.json'):
# def save_request(request_data, page_number, file_path='events_data_231101-241101'):
    # Load existing requests if the file exists
    
    if os.path.exists('datasets/'+ file_path):
        with open('datasets/'+ file_path, 'r') as file:
            requests = json.load(file)
    else:
        requests = []
    
    # Add timestamp to each event 
    new_request = []
    tmp_timestamp = datetime.now().isoformat()


    # add a timestamp to every object being processed
    for tm_object in request_data:

        tm_object['db_stamp'] = tmp_timestamp
        requests.append(tm_object)
    
    
    # Save updated requests back to the file
    
    with open('datasets/' + file_path, 'w') as file:
        json.dump(requests, file, indent=4)

    print(page_number)

In [3]:
# pipeline to download information from an endpoint

error_case = None
def ticketmaster_download_data(object_to_retrieve,start= '2023-11-01T00:00:00Z',end ='2024-11-01T00:00:00Z',page_size = '80'):
    
    global error_case

    print(f'Object to extract: {object_to_retrieve}')
    #default values
    i = 0               # to start in the first page
    next_page = ''      # default page value

    consumer_key = open('api_key.txt','r').read()
    country_code = 'US'


    base_url = 'https://app.ticketmaster.com'

    url0 = f'https://app.ticketmaster.com/discovery/v2/{object_to_retrieve}.json?'
    url0 += 'countryCode=' + country_code
    url0 += '&startDateTime=' + start 
    url0 += '&endDateTime=' + end
    # url0 += '&classificationName=' + 'music'
    url0 += '&size=' + page_size + '&apikey=' + consumer_key

    total_pages = 1
    total_elements = 0

    while i < total_pages:

        # check if this is the first page to prepare
        if i == 0:
            
            # get the first batch of information and retrieve the amount of pages to process
            events_list = requests.request('GET', url0 )
            total_pages = events_list.json()['page']['totalPages']
            total_elements = events_list.json()['page']['totalElements']

            # save the data requested
            if total_pages==0:
                print('There are no elements to retrieve.')
                break
            save_request(
                events_list.json()['_embedded'][object_to_retrieve]
                , str(events_list.json()['page']['number'])
                , f'{object_to_retrieve}_data.json'
                )
            
            # increase the page
            i += 1
            print(f'Total pages: {total_pages}')
            if total_pages > 1000:
                # break
                print('it will break')

        else:
            # proceed in case there is a next page in the request data
            if events_list.json().get('_links',{}).get('next','') != '':
                # request the 'next' page in the link in case there are more data
                events_list = requests.request('GET', base_url + events_list.json()['_links']['next']['href']+ '&apikey=' + consumer_key)
                try:
                    error_case = events_list
                    save_request(
                        events_list.json()['_embedded'][object_to_retrieve]
                        , str(events_list.json()['page']['number'])
                        , f'{object_to_retrieve}_data.json'
                        )
                    i += 1 
                except Exception as e:
                    print(e)
            # stop downloading more 
            else:
                break
        
        # in order to prevent the api_key to be throttled
        time.sleep(1)

    # unit test for validating the downloaded data

    # load recently created json
    with open(f'datasets/{object_to_retrieve}_data.json', 'r') as file:
        full_data = json.load(file)

    if len(full_data) == total_elements:
        print(f'The download of the object {object_to_retrieve} was successful.')
        print(f'Total elements downloaded: {total_elements}')
    else:
        print('There was an issue in the pipeline')
        print('Here is the last request''s response ')
        print('VVVVVVVV')
        print(error_case.text)
        print('')
        print(f'Rows extracted: {len(full_data)}' )

    # the json file needs to be formatted in the proper formatting for GCP
    print('Prepare data to have BigQuery necessary formatting.')
    with open(f'datasets/{object_to_retrieve}_data_f.json', "w") as new_file:
        for row in full_data:
            new_file.write(json.dumps(row))
            new_file.write("\n")

    # delete unformatted version of the data
    os.remove(f'datasets/{object_to_retrieve}_data.json')

In [4]:
# function to upload raw data into BigQuery

from google.cloud import bigquery

# settup global variables for service-account connexion 

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'dbt_service_account_key.json'
client = bigquery.Client(project='ticketmasterargodemo')


def upload_data_to_bigquer(object_of_interest):
    global client

    filename = f'datasets/{object_of_interest}_data_f.json'
    dataset_id = 'stage'
    table_id = f'{object_of_interest}_tb'

    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.autodetect = True

    with open(filename, "rb") as source_file:
        job = client.load_table_from_file(
            source_file,
            table_ref,
            # location="us-east4",  # This is region specific.
            location="us",  # This is a multiregion.
            job_config=job_config,
        )  # API request

    job.result()  # Waits for table load to complete.

    print("Loaded {} rows for object {} into {}:{}.".format(job.output_rows, object_of_interest, dataset_id, table_id))

In [None]:
start_of_month = datetime.utcnow().replace(day=1).strftime('%Y-%m-%dT00:00:00Z')
current_date = datetime.utcnow().strftime('%Y-%m-%d') + 'T00:00:00Z'

ticketmaster_download_data('events','2023-11-01T00:00:00Z',current_date,'50')
print('')
ticketmaster_download_data('attractions',start_of_month,current_date,'80') # this is pending
print('')
ticketmaster_download_data('venues',start_of_month,current_date,'80') # got limited to only 1000 records per deep-page request

Object to extract: events
0
Total pages: 24
1


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x11ba5c6d0>>
Traceback (most recent call last):
  File "/opt/anaconda3/envs/ticketmaster_poc/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


2
3
4
5
6
7
8


In [None]:
for object in ['events','attractions','venues']:
    
    # 3 , for deleting the data
    query_string = f"""DROP TABLE `ticketmasterargodemo.stage.{object}_tb`;"""
    results = client.query_and_wait(query_string)

    print(f'The table {object} has been cleaned.')

The table events has been cleaned.
The table attractions has been cleaned.
The table venues has been cleaned.


In [None]:
upload_data_to_bigquer('events')
upload_data_to_bigquer('attractions')
upload_data_to_bigquer('venues')

Loaded 820 rows for object events into stage:events_tb.
Loaded 1040 rows for object attractions into stage:attractions_tb.
Loaded 1040 rows for object venues into stage:venues_tb.


In [1]:
!dbt run --select events_elt.sql classification_elt.sql event_attractions_elt.sql priceranges_elt.sql products_elt.sql venues_elt.sql

[0m04:58:09  Running with dbt=1.8.8
[0m04:58:10  Registered adapter: bigquery=1.8.3
There are 2 unused configuration paths:
- models.stage.stage_db
- models.stage.production
[0m04:58:10  Found 8 models, 4 data tests, 11 sources, 479 macros
[0m04:58:10  
[0m04:58:11  Concurrency: 4 threads (target='dev')
[0m04:58:11  
[0m04:58:11  1 of 6 START sql incremental model production.classification_elt ............... [RUN]
[0m04:58:11  2 of 6 START sql incremental model production.event_attractions_elt ............ [RUN]
[0m04:58:11  3 of 6 START sql incremental model production.events_elt ....................... [RUN]
[0m04:58:11  4 of 6 START sql incremental model production.priceranges_elt .................. [RUN]
[0m04:58:14  3 of 6 OK created sql incremental model production.events_elt .................. [[32mCREATE TABLE (820.0 rows, 231.4 KiB processed)[0m in 2.85s]
[0m04:58:14  5 of 6 START sql incremental model production.products_elt ..................... [RUN]
[0m04:5

### TEST ENV

In [29]:
# queries to test connection
# 1

# query_string = """SELECT name, SUM(number) as total
# FROM `bigquery-public-data.usa_names.usa_1910_current`
# WHERE name = 'William'
# GROUP BY name;
# """
# results = client.query_and_wait(query_string)

# # Print the results.
# for row in results:  # Wait for the job to complete.
#     print("{}: {}".format(row["name"], row["total"]))



# 2
# query_string = """SELECT *
# FROM `bigquer
# y-public-data.usa_names.usa_1910_current`
# WHERE name = 'William'
# ;
# """
# results = client.query_and_wait(query_string)

# for row in results:
#     print(row)


# 3 , for deleting the data
# query_string = """TRUNCATE TABLE `ticketmaster-demo-argo.stage_db.events_tb`;
# """
# results = client.query_and_wait(query_string)

# for i in results:
#     print(i)