In [1]:
import pandas as pd
import numpy as np
import requests
import warnings
import os 
import pendulum

from tqdm import tqdm, trange
from datetime import datetime, timedelta
from functools import partial

from google.cloud import bigquery, storage

# from utils.gcp import (create_if_not_exist_google_storage, get_gcp_file_count,
#                        get_gcp_total_size, upload_to_bucket,
#                        validate_bucket_files)
# from utils.slack import on_failure, on_success
# from utils.util import make_directory

# from airflow import DAG
# from airflow.operators.dummy import DummyOperator
# from airflow.operators.python import PythonOperator

pd.set_option('display.max_columns', None)
# pd.set_option('display.max_rows', None)
pd.set_option('mode.chained_assignment',  None) 

warnings.simplefilter(action='ignore', category=FutureWarning)

# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "****"



In [2]:
## get JSON from API url

def get_json(depart_date, depart, arrive):
    
    url1 = (
        "https://air.jejupass.com/air/agent/b2c/AIR/INT/AIRINTSCH0100100010.k1?"
        + "initform=OW"
        + "&initCnt=0"
        + "&initMax=2"
        + "&domintgubun=I"
        + "&depdomintgbn=D"
        + "&tasktype=B2C"
        + "&servicecacheyn=Y"
        + "&secrchType=FARE"
        + "&maxprice="
        + "&availcount=250"
        + "&preferaircd="
        + "&depctycd=" + depart
        + "&depctycd="
        + "&depctycd="
        + "&depctycd="
        + "&depctynm="
        + "&depctynm="
        + "&depctynm="
        + "&depctynm="
        + "&arrctycd=" + arrive
        + "&arrctycd="
        + "&arrctycd="
        + "&arrctycd="
        + "&arrctynm="
        + "&arrctynm="
        + "&arrctynm="
        + "&arrctynm="
        + "&depdt=" + depart_date
        + "&depdt="
        + "&depdt="
        + "&depdt="
        + "&opencase=N"
        + "&opencase=N"
        + "&opencase=N"
        + "&openday="
        + "&openday="
        + "&openday="
        + "&opendayNm="
        + "&opendayNm="
        + "&opendayNm="
        + "&nonstop="
        + "&adtcount=1"
        + "&chdcount=0"
        + "&infcount=0"
        + "&cabinclass=Y"
        + "&research=false"
        + "&anywhereDate="
        + "&KSESID=air%3Ab2c%3ASELK138XF%3ASELK138XF%3A%3A00"
    )
    url2 = "https://air.jejupass.com/air/agent/b2c/AIR/INT/AIRINTSCH010010001001.k1jsn?servicecacheyn=Y&requestedfaretype=&KSESID=air%3Ab2c%3ASELK138XF%3ASELK138XF%3A%3A00"

    req = requests.Session()
    session = req.get(url1)
    session = session.cookies['JSESSIONID']
    cookie = {
        'JSESSIONID': session
    }    

    res = requests.get(url2, cookies = cookie)
    json_data = res.json()

    return json_data

In [3]:
## JSON parsing and get details

def get_data(json_data):

    ## Price info
    price_df = pd.DataFrame()

    price_list = list(json_data['totalResult']['itineraries'].keys())

    for i in price_list:

        df1 = pd.json_normalize(json_data['totalResult']['itineraries'][i])
        df2 = pd.json_normalize(json_data['totalResult']['itineraries'][i]['fares'][0]['prices'])
        df3 = pd.json_normalize(json_data['totalResult']['itineraries'][i]['fares'])
        
        df1 = df1[['id', 'airline', 'seat', 'via', 'routingIds']]
        df2 = df2[['price', 'originPrice', 'fuel', 'tax', 'tasf', 'que', 'etc', 'total', 'discount']]
        df3 = df3[['booking', 'cabin', 'cabinNm']]

        price_merge = pd.concat([df1, df2, df3], axis =1 )

        price_df = price_df.append(price_merge)

    price_df = price_df.reset_index(drop = True)

    ## routing info
    routing_df = pd.DataFrame()

    routing_list = list(json_data['totalResult']['routings'].keys())

    for i in routing_list:

        df = pd.json_normalize(json_data['totalResult']['routings'][i])
        routing_df = routing_df.append(df)

    routing_df = routing_df.reset_index(drop = True)

    ## Time info
    time_df = pd.DataFrame()

    time_list = list(json_data['totalResult']['segments'])

    for i in time_list:
        df = pd.json_normalize(json_data['totalResult']['segments'][i])
        time_df = time_df.append(df)
        
    time_df = time_df.reset_index(drop = True)

    return price_df, routing_df, time_df

In [4]:
## data processing

def data_processing(price_df, routing_df, time_df):

    price_df = price_df.query("via == 0") # Direct only
    price_df = price_df.reset_index(drop = True)
    for i in range(len(price_df)):
        price_df.loc[i, ['routingIds', 'booking', 'cabin', 'cabinNm']] = price_df.loc[i, ['routingIds', 'booking', 'cabin', 'cabinNm']].apply(lambda x: ','.join(map(str, x)))
        
    routing_df['segmentIds'] = pd.DataFrame(routing_df['segmentIds'].tolist())[0]
    routing_df = routing_df.drop(['via', 'fstamp', 'fminute', 'depTime', 'jtime', 'ctime', 'atime', 'flightDay', 'codeshare'], axis = 1)

    time_df = time_df.drop(['airline', 'ctime', 'jtime', 'depTimestamp', 'arrTimestamp', 'fstamp', 'fminute', 'depTime', 'stopovers', 'srvlist'], axis = 1)

    merge1 = pd.merge(price_df, routing_df, left_on = 'routingIds', right_on = 'id', how = 'left')
    merge2 = pd.merge(merge1, time_df, left_on = 'segmentIds', right_on = 'id', how = 'left')

    final_df = merge2.sort_values('total')
    final_df = final_df.reset_index(drop = True)

    final_df['depTime'] = final_df['depDate'].str.slice(8, 12)
    final_df['depDate'] = final_df['depDate'].str.slice(0, 8)
    final_df['arrTime'] = final_df['arrDate'].str.slice(8, 12)
    final_df['arrDate'] = final_df['arrDate'].str.slice(0, 8)

    final_df = final_df[[
        'depDate'
        , 'depTime'
        , 'arrDate'
        , 'arrTime'
        , 'depCity'
        , 'depCityNm'
        , 'arrCity'
        , 'arrCityNm'
        , 'depAirport'
        , 'depAirportNm'
        , 'arrAirport'
        , 'arrAirportNm'
        # , 'discount' # ?
        , 'airline'
        , 'flightNo'
        , 'equipment'
        , 'equipmentNm'
        , 'booking'
        , 'cabin'
        # , 'an' # ?
        , 'seat'
        , 'via'
        , 'originPrice'
        , 'price'
        , 'fuel'
        , 'tax'
        , 'tasf'
        , 'que'
        , 'etc'
        , 'total'
        ]]

    search_date = datetime.today().strftime('%Y%m%d')
    final_df['searchDate'] = search_date
    
    final_df = final_df.reset_index(drop = True)
    final_df = final_df.astype({
        'depDate': 'string'
        , 'depTime': 'string'
        , 'arrDate': 'string'
        , 'arrTime': 'string'
        , 'depCity': 'string'
        , 'depCityNm': 'string'
        , 'arrCity': 'string'
        , 'arrCityNm': 'string'
        , 'depAirport': 'string'
        , 'depAirportNm': 'string'
        , 'arrAirport': 'string'
        , 'arrAirportNm': 'string'
        , 'airline': 'string'
        , 'flightNo': 'string'
        , 'equipment': 'string'
        , 'equipmentNm': 'string'
        , 'booking': 'string'
        , 'cabin': 'string'
        , 'seat': int
        , 'via': int
        , 'originPrice': int
        , 'price': int
        , 'fuel': int
        , 'tax': int
        , 'tasf': int
        , 'que': int
        , 'etc': int
        , 'total': int
        , 'searchDate': 'string'
    })

    return final_df

In [5]:
## Main fuction

def airline_ticket_japan():
    
    kor_airpot = ['SEL', 'CJU', 'PUS', 'CJJ', 'KWJ', 'TAE']
    jap_airport = ['FUK', 'TYO', 'SPK', 'NGO', 'OKA', 'KIX']

    air_route = []

    for depart in kor_airpot:
        for arrive in jap_airport:
            air_route.append([depart, arrive])
            air_route.append([arrive, depart])

    final_df = pd.DataFrame()

    for i in trange(1, 61):
        date = datetime.today() + timedelta(i)
        date = date.strftime('%Y-%m-%d')

        for j in air_route:
            depart = j[0]
            arrive = j[1]

            json_data = get_json(date, depart, arrive)

            if json_data['totalResult']['criteria'] == None:
                print(date, depart, arrive, "is None")
                continue

            # try
            
            price_df, routing_df, time_df = get_data(json_data)
            total_df = data_processing(price_df, routing_df, time_df)

            # file_name = f"airline_japan_dep={depart}_arr={arrive}_depDate={date}.parquet"
            # env = "prd" # stg: 테스트
            # root_directory = "crawling_data/airline_japan"
            # try:
            #     upload_to_bucket(total_df, file_name, env, root_directory)
            # except Exception as e:
            #     print(e)
            #     print("Failed to upload to bucket")

            final_df = final_df.append(total_df)

    return final_df

In [6]:
kor_airpot = ['SEL', 'CJU', 'PUS', 'CJJ', 'KWJ', 'TAE']
jap_airport = ['FUK', 'TYO', 'SPK', 'NGO', 'OKA', 'KIX']

air_route = []

for depart in kor_airpot:
    for arrive in jap_airport:
        air_route.append([depart, arrive])
        air_route.append([arrive, depart])

In [7]:
## Without Multithreading

final_df = pd.DataFrame()

for i in trange(1, 4):
    date = datetime.today() + timedelta(i)
    date = date.strftime('%Y-%m-%d')

    for j in air_route:
        depart = j[0]
        arrive = j[1]

        json_data = get_json(date, depart, arrive)

        if json_data['totalResult']['criteria'] == None:
            # print(date, depart, arrive, "is None")
            continue
        
        price_df, routing_df, time_df = get_data(json_data)
        total_df = data_processing(price_df, routing_df, time_df)

        final_df = final_df.append(total_df)

100%|██████████| 3/3 [08:29<00:00, 169.76s/it]


In [10]:
final_df

Unnamed: 0,depDate,depTime,arrDate,arrTime,depCity,depCityNm,arrCity,arrCityNm,depAirport,depAirportNm,arrAirport,arrAirportNm,airline,flightNo,equipment,equipmentNm,booking,cabin,seat,via,originPrice,price,fuel,tax,tasf,que,etc,total,searchDate
0,20230110,1440,20230110,1555,SEL,서울,FUK,후쿠오카,ICN,인천,FUK,후쿠오카,TW,0295,737,BOEING 737 ALL SERIES PASSENGER,Z,M,3,0,170000,170000,18400,28000,0,18400,0,216400,20230109
1,20230110,1805,20230110,1930,SEL,서울,FUK,후쿠오카,ICN,인천,FUK,후쿠오카,TW,0293,737,BOEING 737 ALL SERIES PASSENGER,Z,M,2,0,170000,170000,18400,28000,0,18400,0,216400,20230109
2,20230110,1815,20230110,1935,SEL,서울,FUK,후쿠오카,ICN,인천,FUK,후쿠오카,OZ,0136,32Q,,E,M,1,0,180000,180000,26500,28000,0,0,26500,234500,20230109
3,20230110,0735,20230110,0855,SEL,서울,FUK,후쿠오카,ICN,인천,FUK,후쿠오카,7C,1408,738,BOEING 737-800,K,M,3,0,189000,189000,24200,28000,0,0,24200,241200,20230109
4,20230110,1010,20230110,1130,SEL,서울,FUK,후쿠오카,ICN,인천,FUK,후쿠오카,TW,0291,737,BOEING 737 ALL SERIES PASSENGER,W,M,4,0,200000,200000,18400,28000,0,18400,0,246400,20230109
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
0,20230112,1955,20230112,2100,FUK,후쿠오카,TAE,대구,FUK,후쿠오카,TAE,대구,TW,0234,737,BOEING 737 ALL SERIES PASSENGER,Y,M,6,0,172500,172500,56600,19000,0,56600,0,248100,20230109
0,20230112,1100,20230112,1320,TAE,대구,TYO,도쿄,TAE,대구,NRT,나리타,TW,0201,737,BOEING 737 ALL SERIES PASSENGER,Y,M,8,0,295000,295000,31500,23000,0,31500,0,349500,20230109
0,20230112,1420,20230112,1650,TYO,도쿄,TAE,대구,NRT,나리타,TAE,대구,TW,0202,737,BOEING 737 ALL SERIES PASSENGER,O,M,4,0,115000,115000,56600,35200,0,56600,0,206800,20230109
0,20230112,1015,20230112,1145,TAE,대구,OSA,오사카,TAE,대구,KIX,오사카/간사이,TW,0287,737,BOEING 737 ALL SERIES PASSENGER,Y,M,6,0,270000,270000,18400,23000,0,18400,0,311400,20230109


# Multi Threading

In [9]:
def test(dt):
    final_df = pd.DataFrame()

    for i in trange(dt, dt+3):
        date = datetime.today() + timedelta(i)
        date = date.strftime('%Y-%m-%d')

        for j in air_route:
            depart = j[0]
            arrive = j[1]

            json_data = get_json(date, depart, arrive)

            if json_data['totalResult']['criteria'] == None:
                # print(date, depart, arrive, "is None")
                continue
            
            price_df, routing_df, time_df = get_data(json_data)
            total_df = data_processing(price_df, routing_df, time_df)

            final_df = final_df.append(total_df)
    
    return final_df

In [None]:
test_df = test(1)
test_df

In [None]:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers = 3) as executor:

    x = 1

    t1 = executor.submit(test, x)
    # t2 = executor.submit(test, x+7)
    # t3 = executor.submit(test, x+14)

t1

In [None]:
with ThreadPoolExecutor(max_workers = 3) as executor:

    x = 1

    t1 = executor.submit(test, x)
    print("t1 running")

    t2 = executor.submit(test, x+7)
    print("t2 running")

    t3 = executor.submit(test, x+14)
    print("t3 running")

In [None]:
type(t1)

# Asyncio

In [None]:
import aiohttp
import asyncio

In [None]:
URL = 'https://httpbin.org/uuid'

async def fetch(session, url):
    async with session.get(url) as response:
        json_repsonse = await response.json()
        print(json_repsonse['uuid'])

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, URL) for _ in range(100)]
        await asyncio.gather(*tasks)

def func():
    asyncio.run(main())

func()

# AIRFLOW

In [None]:
# admin = ["iwnoh", "ian"]
# local_tz = pendulum.timezone("Asia/Seoul")
# apply_env = "prd"  # "dev", "stg", "prd"
# gcp_root_directory = "crawling_data/airline_japan"
# use_columns = [
#     'depDate',
#     'depTime',
#     'arrDate',
#     'arrTime',
#     'depCity',
#     'depCityNm',
#     'arrCity',
#     'arrCityNm',
#     'depAirport',
#     'depAirportNm',
#     'arrAirport',
#     'arrAirportNm',
#     'airline',
#     'flightNo',
#     'equipment',
#     'equipmentNm',
#     'booking',
#     'cabin',
#     'seat',
#     'via',
#     'originPrice',
#     'price',
#     'fuel',
#     'tax',
#     'tasf',
#     'que',
#     'etc',
#     'total',
#     'searchDate'
#  ]

In [None]:
# dag = DAG(
#     dag_id = f"crawling_to_google_storage_jejupass_airline_japan_dag_{apply_env}",
#     description = f"{apply_env}_TOPAS 일본 항공권 데이터를 크롤링해서 GoogleStorage에 업로드한다.",
#     start_date = datetime(2022, 12, 27, tzinfo = local_tz),
#     schedule_interval = "0 6 * * *",  #ㄴ schedule_interval은 start_date의 timezone을 따른다.
#     tags = ["air", "topas", "japan", "crawling", "google-storage", apply_env] + admin,
# )

# preprocess_task = PythonOperator(
#     task_id = "create_if_not_exist_google_storage",
#     python_callable = create_if_not_exist_google_storage,
#     op_kwargs = {"env": apply_env, "root_directory": gcp_root_directory},
#     provide_context = True,
#     dag = dag,
#     on_failure_callback = partial(on_failure, admin = admin)
# )

# airline_japan_task = PythonOperator(
#     task_id = "airline_japan_task",
#     python_callable = airline_ticket_japan,
#     op_kwargs = {
#         "env": apply_env, 
#         "root_directory": gcp_root_directory, 
#         "use_columns": use_columns,
#         "execution_yyyymmdd_HHMM": "{{execution_date.in_timezone('Asia/Seoul').strftime('%Y%m%d_%H%M')}}",
#     },
#     dag = dag,
#     on_failure_callback = partial(on_failure, admin = admin)
# )

# validate_task = PythonOperator(
#     task_id = "validate_bucket_files",
#     python_callable = validate_bucket_files,
#     op_kwargs = {
#         "env": apply_env
#         , "root_directory": gcp_root_directory},
#     provide_context = True,
#     dag = dag,
#     on_failure_callback = partial(on_failure, admin = admin)
# )

# slack_alert_task = DummyOperator(
#     task_id = "send_to_slack_with_validates",
#     on_success_callback = partial(on_success, task_id = "validate_bucket_files"),
#     dag=dag
# )

# preprocess_task >> airline_japan_task >> validate_task >> slack_alert_task