In [1]:
from appium import webdriver
from appium.options.android import UiAutomator2Options
from appium.webdriver.common.appiumby import AppiumBy
from appium.webdriver.common.touch_action import TouchAction
from selenium.webdriver.common.action_chains import ActionChains, ActionBuilder
from selenium.webdriver.common.actions.pointer_input import PointerInput
from selenium.webdriver.common.actions import interaction

from bs4 import BeautifulSoup as bs
import re
import numpy as np
import pandas as pd
import time
from datetime import datetime

swipe_down_delay = 1
swipe_up_delay = 1
action_delay = 5
launch_delay = 10

# define swipe
def swipe(driver, start, end):
    actions = ActionChains(driver)
    actions.w3c_actions = ActionBuilder(driver, mouse=PointerInput(interaction.POINTER_TOUCH, "touch"))
    actions.w3c_actions.pointer_action.move_to_location(start[0], start[1])
    actions.w3c_actions.pointer_action.pointer_down()
    actions.w3c_actions.pointer_action.move_to_location(end[0], end[1])
    actions.w3c_actions.pointer_action.release()
    actions.perform()

def click(driver, x, y):
    actions = ActionChains(driver)
    actions.w3c_actions = ActionBuilder(driver, mouse=PointerInput(interaction.POINTER_TOUCH, "touch"))
    actions.w3c_actions.pointer_action.move_to_location(x, y)
    actions.w3c_actions.pointer_action.pointer_down()
    actions.w3c_actions.pointer_action.pause(0.1)
    actions.w3c_actions.pointer_action.release()
    actions.perform()

def process_toko_pulsa(df, telco, category):
    return (
        pd.concat(df)
        .rename(columns={
            0:'product_value', 
            1:'price',
            2:'SKU',
        })
        .loc[lambda x: x.product_value.astype(str).str.contains('^Rp')]
        .drop(columns=[3, 4])
        .assign(
            price=lambda x: x.price.str.replace('[Rp\.]', '', regex=True),
            product_value=lambda x: x.product_value.str.replace('[Rp\.]', '', regex=True),
            brand = telco,
            category = category,
        )
        .drop_duplicates()
    )

def process_toko_paket(df, telco, category):
    return (
        pd.concat(df)
        .rename(columns={
            0:'SKU', 
            1:'price',
            2:'discount',
            3:'product_value',
            5:'note',
        })
        .replace('Lihat Detail', None)
        .loc[lambda x: x.price.astype(str).str.contains('^Rp')]
        .drop(columns=[4])
        .assign(
            price=lambda x: x.price.str.replace('[Rp\.]', '', regex=True),
            product_value=lambda x: x.product_value.str.replace('[Rp\.]', '', regex=True),
            brand = telco,
            category = category,
        )
        .drop_duplicates()
    )

def process_toko_pln(df):
    return (
    pd.concat(df)
    .rename(columns={
        0:'SKU', 
        1:'price',
    })
    .loc[lambda x: x.price.astype(str).str.contains('^Rp')]
    .drop(columns=[2,3])
    .assign(
        price=lambda x: x.price.str.replace('[Rp\.]', '', regex=True),
        product_value=lambda x: x.SKU.str.replace('[Rp\.]', '', regex=True),
        category = 'PLN',
    )
    .drop_duplicates()
)   

def process_toko_voucher(df, brand):
    cleaned =\
        (
            pd.concat(df)
            .assign(
                has_na = lambda x: np.where(x.isnull(), 1, 0)
            )
            .rename(columns={
                0:'SKU', 
                1:'disc_pct',
                2:'base_price',
                3:'disc_price'
            })
            .assign(
                base_price=lambda x: x.base_price.str.replace('[Rp\.]', '', regex=True),
                disc_price=lambda x: x.disc_price.str.replace('[Rp\.]', '', regex=True),
                brand = brand,
                category = 'Voucher Game',
            )
            .drop_duplicates()
        )



    return (
    pd.concat(df)
    .rename(columns={
        0:'SKU', 
        1:'disc_pct',
        2:'base_price',
        3:'disc_price'
    })
    .loc[lambda x: x.base_price.astype(str).str.contains('^Rp')]
    .assign(
        base_price=lambda x: x.base_price.str.replace('[Rp\.]', '', regex=True),
        disc_price=lambda x: x.disc_price.str.replace('[Rp\.]', '', regex=True),
        brand = brand,
        category = 'Voucher Game',
    )
    .drop_duplicates()
)   

In [2]:
# dont forget to run "appium --allow-cors" in terminal

options = UiAutomator2Options()
options.automationName = 'UiAutomator2'
options.udid = 'emulator-5554'
options.platformName = 'Android'
options.platformVersion = '12'
options.deviceName = 'bwphone'

# setup the driver
driver = webdriver.Remote('http://127.0.0.1:4723', options=options)

In [3]:
# remove the warnings
import warnings
warnings.filterwarnings('ignore')

In [4]:
telcos_prefix = {
    'smartfren':'0881',
    'telkomsel':'0812',
    'im3':'0814',
    'xl':'0818',
    'axis':'0831',
    '3':'0894',
}

# # keeps clicking back until reach home screen
while driver.current_activity != '.NexusLauncherActivity':
    driver.press_keycode(5)
    time.sleep(action_delay)

# open the app then wait
driver.activate_app('com.tokopedia.kelontongapp')
time.sleep(launch_delay)

# popup will appear. we will click back and if we are out then will just go back in
driver.press_keycode(4)
time.sleep(launch_delay)
if driver.current_package != 'com.tokopedia.kelontongapp':
    driver.activate_app('com.tokopedia.kelontongapp')
    time.sleep(launch_delay)


results = []
p = 'Pulsa'
for telco in list(telcos_prefix.keys()):
    # Go into pulsa
    # notes: easier to just mention the coordinate
    click(driver, 118, 1313)
    time.sleep(action_delay)

    el3 = driver.find_element(by=AppiumBy.XPATH, value='//android.widget.EditText')
    el3.send_keys(telcos_prefix[telco])
    time.sleep(action_delay)
    driver.press_keycode(4)
    time.sleep(action_delay)

    rows = []
    while True:
        source = driver.page_source
        soup = bs(source, 'html')
        new_rows = pd.concat([pd.Series(re.findall(r'text="(.+)"', str(i.parent))) for i in list(soup.find_all(attrs={'text': re.compile(r'^Rp.+$')}))], axis=1).T
        rows.append(new_rows)
        if bool(re.search('Mengapa Harus', source)):
            break
        swipe(driver, (500, 1400), (500, 800))
        time.sleep(swipe_down_delay)
    results.append(process_toko_pulsa(rows, telco, p))

    driver.press_keycode(4)
    time.sleep(action_delay)

p = 'Paket Data'
for telco in list(telcos_prefix.keys()):
    print(p, telco)

    # dismiss interstitial banner
    try:
        e = driver.find_element(by=AppiumBy.XPATH, value='/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.view.ViewGroup/android.widget.RelativeLayout/android.webkit.WebView/android.webkit.WebView/android.view.View[2]/android.app.Dialog/android.view.View/android.view.View[1]/android.widget.Button')
        e.click()
        time.sleep(action_delay)
    except:
        pass

    driver.find_element(by=AppiumBy.XPATH, value='//android.view.View[@text="Paket Data"]').click()
    time.sleep(action_delay)

    el3 = driver.find_element(by=AppiumBy.XPATH, value='//android.widget.EditText')
    el3.send_keys(telcos_prefix[telco])
    time.sleep(action_delay)
    driver.press_keycode(4)
    time.sleep(action_delay)

    rows = []
    while True:
        source = driver.page_source
        soup = bs(source, 'html')
        new_rows = pd.concat([pd.Series(re.findall(r'text="(.+)"', str(i.parent))) for i in list(soup.find_all(attrs={'text': re.compile(r'^Rp.+$')}))], axis=1).T
        rows.append(new_rows)
        if bool(re.search('Mengapa Harus', source)):
            break
        swipe(driver, (500, 1400), (500, 800))
        time.sleep(swipe_down_delay)
    results.append(process_toko_paket(rows, telco, p))
        
    driver.press_keycode(4)
    time.sleep(action_delay)

p = 'PLN'
driver.find_element(by=AppiumBy.XPATH, value='//android.view.View[@text="PLN"]').click()
time.sleep(action_delay)
driver.press_keycode(4)
time.sleep(action_delay)

rows = []
while True:
    source = driver.page_source
    soup = bs(source, 'html')
    new_rows = pd.concat([pd.Series(re.findall(r'text="(.+)"', str(i.parent))) for i in list(soup.find_all(attrs={'text': re.compile(r'^Rp.+$')}))], axis=1).T
    rows.append(new_rows)
    if bool(re.search('Mengapa Harus', source)):
        break
    swipe(driver, (500, 1400), (500, 800))
    time.sleep(swipe_down_delay)
results.append(process_toko_pln(rows))




# p = 'Voucher Game'
# driver.find_element(by=AppiumBy.XPATH, value='//android.view.View[@text="Voucher Game"]').click()
# time.sleep(action_delay)

# for boxes in range(5, 14):
#     driver.find_element(by=AppiumBy.XPATH, value=f"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.view.ViewGroup/android.widget.RelativeLayout/android.webkit.WebView/android.webkit.WebView/android.view.View/android.view.View[{boxes}]").click()
#     time.sleep(action_delay)
#     brand = driver.find_element(by=AppiumBy.XPATH, value="/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.view.ViewGroup/android.widget.RelativeLayout/android.webkit.WebView/android.webkit.WebView/android.view.View/android.view.View[5]").text
#     rows = []
#     while True:
#         source = driver.page_source
#         soup = bs(source, 'html')
#         new_rows = pd.concat([pd.Series(re.findall(r'text="(.+)"', str(i.parent))) for i in list(soup.find_all(attrs={'text': re.compile(r'^Rp.+$')}))], axis=1).T
#         rows.append(new_rows)
#         if bool(re.search('Mengapa Harus', source)):
#             break
#         swipe(driver, (500, 1400), (500, 800))
#         time.sleep(swipe_down_delay)
#     results.append(process_toko_voucher(rows, brand=brand))
#     # go back
#     driver.press_keycode(4)
#     time.sleep(action_delay)

# for swipes in range(1, 5):
#     for boxes in range(1, 19):
#         driver.find_element(by=AppiumBy.XPATH, value=f"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.view.ViewGroup/android.widget.RelativeLayout/android.webkit.WebView/android.webkit.WebView/android.view.View/android.view.View[{boxes}]").click()
#         brand = driver.find_element(by=AppiumBy.XPATH, value="/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.view.ViewGroup/android.widget.RelativeLayout/android.webkit.WebView/android.webkit.WebView/android.view.View/android.view.View[5]").text
#         time.sleep(action_delay)
#         rows = []
#         while True:
#             source = driver.page_source
#             soup = bs(source, 'html')
#             new_rows = pd.concat([pd.Series(re.findall(r'text="(.+)"', str(i.parent))) for i in list(soup.find_all(attrs={'text': re.compile(r'^Rp.+$')}))], axis=1).T
#             rows.append(new_rows)
#             if bool(re.search('Mengapa Harus', source)):
#                 break
#             swipe(driver, (500, 1400), (500, 800))
#             time.sleep(swipe_down_delay)
#         results.append(process_toko_voucher(rows, brand=brand))
#         # go back
#         driver.press_keycode(4)
#         time.sleep(action_delay)


results_df = (
    pd.concat(results)
    .drop_duplicates()
    .assign(
        platform = 'mitra tokopedia',
    )
)

Paket Data smartfren
Paket Data telkomsel
Paket Data im3
Paket Data xl
Paket Data axis
Paket Data 3


In [5]:
rows = []
source = driver.page_source
soup = bs(source, 'html')
new_rows = pd.concat([pd.Series(re.findall(r'text="(.+)"', str(i.parent))) for i in list(soup.find_all(attrs={'text': re.compile(r'^Rp.+$')}))], axis=1).T
rows.append(new_rows)

In [6]:
from datetime import date
today = date.today().strftime("%Y_%m_%d")
results_df.to_csv(f'result/tokopedia_scrape_{today}.csv')