Skip to content

Commit

Permalink
Merge pull request #1 from tses89214/feature/download_data
Browse files Browse the repository at this point in the history
Feature/download data
  • Loading branch information
tses89214 committed Apr 30, 2024
2 parents f3e2bcc + 2cf7539 commit cae4f2e
Show file tree
Hide file tree
Showing 19 changed files with 570 additions and 1 deletion.
29 changes: 29 additions & 0 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: UnitTest

on:
push:
branches:
- "**" # matches every branch
tags:
- "v**"

jobs:
test:
runs-on: ubuntu-latest
permissions:
packages: write
contents: read
id-token: write
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.9"
cache: "pip"

- name: Install dependencies
run: python -m pip install --upgrade pip setuptools && python -m pip install -r requirements.txt

- name: Run test
run: pytest
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,8 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# test ipynb file
*.ipynb

credentials.json
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# ubike-historycial-data
# ubike-historical-data
因目前 ubike 相關數據僅提供實時數據,因此想新增一個專案來累積 ubike 站點車輛數的歷史數據
3 changes: 3 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

plugins = ["pytest-bigquery-mock"]
37 changes: 37 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Here is the main entrypoint of this project.
"""
import hashlib
import dotenv

from src.crawler import download_data, split_table
from src.db_connector import BQConnector


def main():
"""
Main entrypoint.
"""
dotenv.load_dotenv(override=True)

bq_connector = BQConnector()

records = download_data()
sites, slots = split_table(records)

# generate current md5 for check.
m = hashlib.md5()
m.update(sites.to_string().encode())
current_md5 = m.hexdigest()
md5_check_result = bq_connector.check_md5_for_update(current_md5)

if md5_check_result is False:
bq_connector.overwrite_sites(sites)
bq_connector.overwrite_site_md5(current_md5)

# slots always update.
bq_connector.append_slots(slots)


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[pytest]
addopts= -vv --cov .
markers =
bq_query_return_data: bq_query_return_data
9 changes: 9 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
requests==2.28.1
pandas==2.1.1
pytest==8.0.1
google-cloud-bigquery==3.21.0
pytest-bigquery-mock==0.1.2
requests-mock==1.11.0
python-dotenv==0.21.1
pyarrow==16.0.0
pytest-cov==4.1.0
Empty file added src/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions src/alarm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
This module contains some alarm system,
currently we have TG chatbot.
"""

import requests
import os


class TGBot:
"""
Telegram chatbot.
"""

def __init__(self) -> None:
self.token = os.getenv("tg_token")
self.chat_id = os.getenv("tg_chat_id")

def send_message(self, text: str):
"""
Send message.
Args:
text (str): text you want to send.
Returns:
response.
"""
return requests.get(
url=f'https://api.telegram.org/bot{self.token}/sendMessage?chat_id={self.chat_id}&text={text}',
timeout=60
)
56 changes: 56 additions & 0 deletions src/crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Some functions to download data from ubike api.
"""
from typing import List, Dict

import requests
import pandas as pd


def download_data():
"""
Download data from ubike api.
Returns:
data: List of json(records).
"""
url = 'https://tcgbusfs.blob.core.windows.net/dotapp/youbike/v2/youbike_immediate.json'
data = requests.get(url, timeout=60).json()
return data


def split_table(records: List[Dict]):
"""
Split Table into Site and Slots two tables.
Args:
records: List of Dict. The raw data get from API.
Returns:
(Sites, Slots):
Sites table fields:
- sno
- sna
- tot
- sarea
- lat
- lng
- ar
- sareaen
- snaen
- aren
- act
Slots table fields:
- sno
- sbi
- infoTime
"""
raw_table = pd.DataFrame(records)
sites = raw_table[['sno', 'sna', 'tot', 'sarea',
'lat', 'lng', 'ar', 'sareaen', 'aren', 'act']]
slots = raw_table[['sno', 'sbi', 'infoTime']]
slots.loc[:, 'infoTime'] = pd.to_datetime(slots['infoTime'])

return sites, slots
140 changes: 140 additions & 0 deletions src/db_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
This modules contains connector used to Connect to databases.
Currently we only has BigQuery Connector(BQConnector).
"""
from google.cloud import bigquery
import pandas as pd

from src.table import sites, slots, site_md5


class BQConnector:
"""
This class is used to Connect to BigQuery,
To keep things simple, store and write logic are included in this script.
"""

def __init__(self, client: bigquery.Client = None) -> None:
self._client = bigquery.Client() if client is None else client

def check_md5_for_update(self, site_data_md5: str) -> bool:
"""
Check is md5 same or not.
Args:
site_data_md5: str. the md5 of site data.
Returns:
bool: whether md5 is same.
"""
query = 'SELECT md5 FROM `ubike-crawler.ubike_data.site_md5` LIMIT 1 '
query_job = self._client.query(query)
rows = query_job.result()
for row in rows:
current_md5 = row['md5']

return current_md5 == site_data_md5

def overwrite_sites(self, sites_data: pd.DataFrame):
"""
Overwrite sites table.
Args:
sites_data (pd.DataFrame): sites data.
Returns:
job results.
"""
table_id = 'ubike-crawler.ubike_data.sites'
job_config = bigquery.LoadJobConfig(
schema=sites.to_bq_schema(),
write_disposition='WRITE_TRUNCATE'
)
job = self._client.load_table_from_dataframe(
sites_data, table_id, job_config=job_config
)
return job.result()

def overwrite_site_md5(self, site_md5_data: str):
"""
Overwrite site_md5 table.
Args:
site_md5 (str): site_md5.
Returns:
job results.
"""
table_id = 'ubike-crawler.ubike_data.site_md5'
job_config = bigquery.LoadJobConfig(
schema=site_md5.to_bq_schema(),
write_disposition='WRITE_TRUNCATE'
)
job = self._client.load_table_from_dataframe(
pd.DataFrame({'md5': site_md5_data}, index=[0]),
table_id,
job_config=job_config
)
return job.result()

def append_slots(self, slots_data: pd.DataFrame):
"""
Append sites table.
Args:
slots_data (pd.DataFrame): slots data.
Returns:
job results.
"""
table_id = 'ubike-crawler.ubike_data.slots'
job_config = bigquery.LoadJobConfig(
schema=slots.to_bq_schema(),
write_disposition='WRITE_APPEND'
)
job = self._client.load_table_from_dataframe(
slots_data, table_id, job_config=job_config
)
return job.result()

def read_sites(self) -> pd.DataFrame:
"""
Read the whole sites table.
Returns:
bool: whether md5 is same.
"""
query = 'SELECT * FROM `ubike-crawler.ubike_data.sites`'
query_job = self._client.query(query)
return query_job.to_dataframe()

def read_slots(self) -> pd.DataFrame:
"""
Read the slots table of specific time range.
Returns:
bool: whether md5 is same.
"""
query = """
SELECT * FROM `ubike-crawler.ubike_data.slots`
WHERE DATE(infoTime) BETWEEN
DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY)
AND
DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
"""
query_job = self._client.query(query)
return query_job.to_dataframe()

def clean_slots(self):
"""
Clean last week data from slots table.
"""
query = """
DELETE FROM `ubike-crawler.ubike_data.slots`
WHERE DATE(infoTime) BETWEEN
DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY)
AND
DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
"""
delete_job = self._client.query(query)
delete_job.result()
Loading

0 comments on commit cae4f2e

Please sign in to comment.