In [27]:
import pandas as pd
import os
import zipfile
import aiohttp
import asyncio
from pathlib import Path
import requests
from bs4 import BeautifulSoup
import boto3
import gzip
import io

  def namedtuple(typename, field_names, *, rename=False, defaults=None, module=None):


In [14]:
#ex1
DOWNLOAD_DIR = Path("downloads")
URLS = [
    "https://github.com/danielbeach/data-engineering-practice/raw/main/Exercises/Exercise-1/data/1.zip",
    "https://github.com/danielbeach/data-engineering-practice/raw/main/Exercises/Exercise-1/data/2.zip",
]

def create_download_dir():
    DOWNLOAD_DIR.mkdir(exist_ok=True)


def get_filename_from_url(url):
    return url.split("/")[-1]

def unzip_and_cleanup(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(DOWNLOAD_DIR)
    os.remove(zip_path)

async def download_file(session, url):
    filename = get_filename_from_url(url)
    file_path = DOWNLOAD_DIR / filename

    try:
        async with session.get(url) as response:
            if response.status != 200:
                print(f" Lỗi khi tải {url}")
                return

            with open(file_path, 'wb') as f:
                f.write(await response.read())
            print(f" Tải thành công: {filename}")
            unzip_and_cleanup(file_path)

    except Exception as e:
        print(f"⚠️ Lỗi: {e} khi tải {url}")

async def download_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [download_file(session, url) for url in urls]
        await asyncio.gather(*tasks)

def main():
    create_download_dir()
    try:
        loop = asyncio.get_running_loop()
        task = asyncio.ensure_future(download_all(URLS))
    except RuntimeError:
        asyncio.run(download_all(URLS))


if __name__ == "__main__":
    main()


In [19]:
#ex2
BASE_URL = "https://www.ncei.noaa.gov/data/local-climatological-data/access/2021/"
TARGET_MODIFIED = "2024-01-19 10:27"

DOWNLOAD_DIR = Path("downloads")
DOWNLOAD_DIR.mkdir(exist_ok=True)

def fetch_html():
    response = requests.get(BASE_URL)
    response.raise_for_status()
    return response.text


def find_file(html):
    soup = BeautifulSoup(html, "html.parser")
    rows = soup.find_all("tr")

    for row in rows:
        cols = row.find_all("td")
        if len(cols) < 2:
            continue
        modified = cols[1].text.strip()
        if modified == TARGET_MODIFIED:
            filename = cols[0].find("a").get("href")
            return filename
    return None

def download_file(filename):
    url = BASE_URL + filename
    file_path = DOWNLOAD_DIR / filename

    response = requests.get(url)
    response.raise_for_status()

    with open(file_path, "wb") as f:
        f.write(response.content)

    return file_path

def analyze_file(file_path):
    df = pd.read_csv(file_path)
    df_clean = df.dropna(subset=["HourlyDryBulbTemperature"])
    df_sorted = df_clean.sort_values(by="HourlyDryBulbTemperature", ascending=False)
    top_temp = df_sorted.head(1)
    print(" Bản ghi có nhiệt độ cao nhất:")
    print(top_temp)

def main():
    html = fetch_html()
    filename = find_file(html)

    if not filename:
        print(" Không tìm thấy file với thời gian chỉnh sửa yêu cầu.")
        return

    print(f"🔎 Đã tìm thấy file: {filename}")
    file_path = download_file(filename)
    analyze_file(file_path)

if __name__ == "__main__":
    main()

🔎 Đã tìm thấy file: 01023199999.csv
📈 Bản ghi có nhiệt độ cao nhất:
      STATION                 DATE  LATITUDE  LONGITUDE  ELEVATION  \
0  1023199999  2021-09-22T12:20:00     64.35        7.8        0.0   

          NAME REPORT_TYPE  SOURCE  HourlyAltimeterSetting  \
0  DRAUGEN, NO       FM-15       4                   29.62   

   HourlyDewPointTemperature  ...  BackupDirection  BackupDistance  \
0                       45.0  ...              NaN             NaN   

  BackupDistanceUnit  BackupElements  BackupElevation  BackupEquipment  \
0                NaN             NaN              NaN              NaN   

  BackupLatitude  BackupLongitude  BackupName  WindEquipmentChangeDate  
0            NaN              NaN         NaN                      NaN  

[1 rows x 125 columns]


In [28]:
#ex3
BUCKET = "commoncrawl"
PATH_GZ = "crawl-data/CC-MAIN-2022-05/wet.paths.gz"

def download_gz_file_in_memory(bucket, key):
    s3 = boto3.client("s3")
    response = s3.get_object(Bucket=bucket, Key=key)
    gz_data = response["Body"].read()
    return gz_data

def extract_first_uri_from_gz(gz_bytes):
    with gzip.GzipFile(fileobj=io.BytesIO(gz_bytes)) as f:
        for line in f:
            return line.decode("utf-8").strip()  # chỉ lấy dòng đầu tiên

def stream_file_from_s3(bucket, key):
    s3 = boto3.client("s3")
    response = s3.get_object(Bucket=bucket, Key=key)
    body = response['Body']

    print(f"\n Đang in nội dung từ {key}:\n")
    for line in body.iter_lines():
        print(line.decode("utf-8"))

def main():
    print(" Tải tệp .gz về từ S3...")
    gz_data = download_gz_file_in_memory(BUCKET, PATH_GZ)

    print(" Giải nén và đọc dòng đầu tiên...")
    first_uri_path = extract_first_uri_from_gz(gz_data)
    print(f"🔗 URI đầu tiên là: {first_uri_path}")

    print(" Stream nội dung từ file URI...")
    stream_file_from_s3(BUCKET, first_uri_path)

if __name__ == "__main__":
    main()

 Tải tệp .gz về từ S3...


NoCredentialsError: Unable to locate credentials

In [26]:
!pip install boto3

