In [32]:
import os
import time
import pandas as pd
import requests
from bs4 import BeautifulSoup
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType,DoubleType
from pyspark.sql.functions import col


In [191]:
input = "input_data"
output = "output_data"
input_path = os.path.join(os.curdir, input)
output_path = os.path.join(os.curdir, output)
if not os.path.exists(input_path):
    os.mkdir(input_path)
    print(f"Directory created: {input}")

if not os.path.exists(output_path):
    os.mkdir(output_path)
    print(f"Directory created: {output}")

In [198]:
def get_category(url, file_name):
    product_list = []

    pages = 150

    for page in range(1, pages):
        resp = requests.get(url+f"?page={page}")
        content = BeautifulSoup(resp.content, "html.parser")
        for product in content.select("div.product__content"):
            try:
                data = {
                    "Title": product.select("span.product__title")[0].get_text().strip(),
                    "Location": product.select("p.product__location")[0].get_text().strip(),
                    "description": product.select("p.product__description")[0].get_text().strip(),
                    "Condition": product.select("div.product__tags span")[0].get_text().strip()
                }
            except IndexError:
                continue
        
            product_list.append(data)
        time.sleep(2)
        print(f"fetching page {page}")
    dataframe = pd.DataFrame(product_list)
    print(dataframe)
    dataframe.to_csv(file_name)

url ="https://tonaton.com/c_vehicles"
print("fetching data...")
get_category(url, f"{input_path}/pro.csv")
print("Fetching data...")

Fetching data...


In [219]:

conf = SparkConf() \
    .setAppName("jiji-scraper-etl") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath","c:/pyspark/*")
sc = SparkContext.getOrCreate(conf=conf)
etl = SparkSession(sc)

In [293]:
df=etl.read.option("header",True).csv("./input_data/pro.csv")

In [294]:
df.show()

+---+-----------+--------------------+--------------------+------------+
|_c0|      Title|            Location|         description|   Condition|
+---+-----------+--------------------+--------------------+------------+
|  0|GH₵ 355,000|Greater Accra, Ea...|Mercedes-Benz E30...|Foreign Used|
|  1|    GH₵ 150|Greater Accra, Ab...|Toyota Ignition Coil|        Used|
|  2|    GH₵ 250|Greater Accra, Ab...|        Toyota Tyres|   Brand New|
|  3|  GH₵ 1,100|Greater Accra, Ab...|     Ford F150 Tires|   Brand New|
|  4|    GH₵ 200|Greater Accra, Ab...|Jack and Wheel Sp...|   Brand New|
|  5|  GH₵ 1,000|Greater Accra, Ab...|       Seat Covers20|   Brand New|
|  6|  GH₵ 1,000|Greater Accra, Ab...|Luxury Black 9D S...|   Brand New|
|  7|    GH₵ 350|Greater Accra, Ac...|All Kinds of Body...|   Brand New|
|  8|    GH₵ 100|Greater Accra, Ab...|  Lower Arm Bushings|   Brand New|
|  9| GH₵ 20,000|Greater Accra, Ab...|(Corolla-2020)All...|        Used|
| 10|    GH₵ 300|Greater Accra, Ab...|Rim 15 Ring W

In [295]:
df=df.select(
    "_c0",
    "description",
    "Condition",
    "title",
    "Location"
).withColumnRenamed('title', 'Price').withColumnRenamed('description', 'Description').withColumnRenamed('_c0','id').\
    withColumn('Price', regexp_replace('Price', r'[GH₵]', '')).withColumn('Price', regexp_replace('Price', r'[,]', '')).\
        withColumn("id", col("id").cast(IntegerType())).withColumn("price", col("price").cast(IntegerType()))
   

In [296]:
df.show()

+---+--------------------+------------+------+--------------------+
| id|         Description|   Condition| price|            Location|
+---+--------------------+------------+------+--------------------+
|  0|Mercedes-Benz E30...|Foreign Used|355000|Greater Accra, Ea...|
|  1|Toyota Ignition Coil|        Used|   150|Greater Accra, Ab...|
|  2|        Toyota Tyres|   Brand New|   250|Greater Accra, Ab...|
|  3|     Ford F150 Tires|   Brand New|  1100|Greater Accra, Ab...|
|  4|Jack and Wheel Sp...|   Brand New|   200|Greater Accra, Ab...|
|  5|       Seat Covers20|   Brand New|  1000|Greater Accra, Ab...|
|  6|Luxury Black 9D S...|   Brand New|  1000|Greater Accra, Ab...|
|  7|All Kinds of Body...|   Brand New|   350|Greater Accra, Ac...|
|  8|  Lower Arm Bushings|   Brand New|   100|Greater Accra, Ab...|
|  9|(Corolla-2020)All...|        Used| 20000|Greater Accra, Ab...|
| 10|Rim 15 Ring Wheel...|   Brand New|   300|Greater Accra, Ab...|
| 11|   Gucci Seat Covers|   Brand New|   600|Gr

In [297]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Condition: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- Location: string (nullable = true)



In [298]:
df = df.toPandas()
df.to_csv(output_path+'/output.csv')

In [307]:
import psycopg2
import o
def connect_db():
    conn = psycopg2.connect(f"host=127.0.0.1 dbname=tutorial user=etl password=demopass")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    return cur, conn

In [308]:
cur, conn = connect_db()


In [309]:
product_table = (
    """CREATE TABLE IF NOT EXISTS product(
        id INT PRIMARY KEY,
        Description VARCHAR,
        Condition VARCHAR,
        Price INT,
        Location VARCHAR
    )"""
)

In [310]:
cur.execute(product_table)
conn.commit()


In [311]:
product_table_insert = (
    """INSERT INTO product(
        id,
        Description,
        Condition,
        Price,
        Location)
    VALUES (%s, %s, %s, %s, %s)
    """
)

In [312]:
for i, row in df.iterrows():
    cur.execute(product_table_insert, list(row))
    

In [313]:
conn.commit()

In [305]:
# cur.execute("""DROP TABLE IF EXISTS product""")