**Note:**
This data pipeline scrappes raw data from [joburgmarket](http://www.joburgmarket.co.za/dailyprices.php) to an SQL Database.


In [1]:
import pandas as pd
import requests
import urllib
from scrapy.http import HtmlResponse
from sqlalchemy import create_engine, insert, Table, MetaData, select

# Custom upload with connection string
from engine_info import server_info
# From tables.py
import tables

import warnings
warnings.filterwarnings('ignore')

In [2]:
# Url where the data is going to be scrapped from
url = 'http://www.joburgmarket.co.za/dailyprices.php'

In [3]:
# Extracting html content of the url
html = requests.get(url).content
# Creating a response variable to enable web scrapping in the url 
response = HtmlResponse(url=url, body=html)

In [4]:
# Extracting the date for latest information displayed on the website
date = response.xpath('//div[@id="right2"]').css('p b ::text').get()
print(f"The latest information in the Joburg market website is for {date}.")

The latest information in the Joburg market website is for 28 August 2020.


In [5]:
# Creating a connection to MS SQL SERVER
params = urllib.parse.quote_plus(server_info)
engine = create_engine('mssql+pyodbc:///?odbc_connect=%s' % params)
connection = engine.connect()

In [6]:
# Check what is in the database
engine.table_names()

['commodity_raw', 'container_raw', 'product_combination_raw', 'scrapping_date']

## scrapping_date update

In [7]:
metadata = MetaData(bind=engine)

In [8]:
# Dates from the sql date table
scrapped_dates = Table('scrapping_date', metadata, autoload=True, autoload_engine=engine)

In [9]:
stmt = select([scrapped_dates.columns.date])

In [10]:
sql_dates = connection.execute(stmt).fetchall()

In [11]:
date_df = pd.DataFrame([date], columns=[tables.ScrappingDates.__table__.columns.keys()[1]])

In [12]:
date_df

Unnamed: 0,date
0,28 August 2020


In [13]:
# Transfer dataframe to sql table
date_df.to_sql('scrapping_date', con=engine, index=False, if_exists='append')

## commodity_raw update

In [14]:
# Data scrapped comes in as a list, which must be converted to a Dataframe
commodity_df = pd.read_html(url)[0]

In [15]:
commodity_df.head()

Unnamed: 0,Commodity,Total Value Sold,Total Qty Sold,Total Kg Sold,Qty Available
0,APPLES,"R1,118,810.10MTD: R32,486,246.10","12,062MTD: 374,122","151,621MTD: 4,509,363",95504
1,ASPARAGUS,"R7,875.00MTD: R405,670.00",27MTD: 836,"65MTD: 3,144",12
2,ATCHARA,"R50.00MTD: R1,623.60",1MTD: 27,2MTD: 79,203
3,AVOCADOS,"R604,493.00MTD: R16,270,349.40","4,925MTD: 164,693","31,160MTD: 992,019",19001
4,BABY BEET,"R0.00MTD: R2,042.00",0MTD: 41,0MTD: 83,1


In [16]:
# Attach the date column to the dataframe
commodity_df.insert(loc=0, column='Date', value=date)

In [17]:
commodity_df.head()

Unnamed: 0,Date,Commodity,Total Value Sold,Total Qty Sold,Total Kg Sold,Qty Available
0,28 August 2020,APPLES,"R1,118,810.10MTD: R32,486,246.10","12,062MTD: 374,122","151,621MTD: 4,509,363",95504
1,28 August 2020,ASPARAGUS,"R7,875.00MTD: R405,670.00",27MTD: 836,"65MTD: 3,144",12
2,28 August 2020,ATCHARA,"R50.00MTD: R1,623.60",1MTD: 27,2MTD: 79,203
3,28 August 2020,AVOCADOS,"R604,493.00MTD: R16,270,349.40","4,925MTD: 164,693","31,160MTD: 992,019",19001
4,28 August 2020,BABY BEET,"R0.00MTD: R2,042.00",0MTD: 41,0MTD: 83,1


In [18]:
# The commodity_df must be transfered to commodity_raw sql database.
# Rename the column names in DataFrame to match column names in SQL table
commodity_df.columns = tables.Commodity.__table__.columns.keys()[1:] # Exclude the id column

In [19]:
commodity_df.head()

Unnamed: 0,date,commodity,total_value_sold,total_qty_sold,total_kg_sold,qty_available
0,28 August 2020,APPLES,"R1,118,810.10MTD: R32,486,246.10","12,062MTD: 374,122","151,621MTD: 4,509,363",95504
1,28 August 2020,ASPARAGUS,"R7,875.00MTD: R405,670.00",27MTD: 836,"65MTD: 3,144",12
2,28 August 2020,ATCHARA,"R50.00MTD: R1,623.60",1MTD: 27,2MTD: 79,203
3,28 August 2020,AVOCADOS,"R604,493.00MTD: R16,270,349.40","4,925MTD: 164,693","31,160MTD: 992,019",19001
4,28 August 2020,BABY BEET,"R0.00MTD: R2,042.00",0MTD: 41,0MTD: 83,1


In [20]:
# Transfer dataframe to sql table
commodity_df.to_sql('commodity_raw', con=engine, index=False, if_exists='append')

## container_raw update

In [21]:
# Extract value that links it to the commodity website for a detailed stats
commodity_values = response.xpath('//select/option/@value').extract()[1:] # Exclude 'All' option
# Extract commodity name
commodity_names = response.xpath('//table[@class="alltable"]').css('td.tleft2 ::text').extract()

In [22]:
for index, (name, value) in enumerate(zip(commodity_names, commodity_values)):
    # URL for container information which will update value for each commodity
    url_one = f'http://www.joburgmarket.co.za/dailyprices.php?commodity={value}&containerall=1'
    
    # Initially create a DataFrame 
    if not index:
        
        container_df = pd.read_html(url_one)[0]
        # Attach the commodity name column to the container dataframe
        container_df.insert(loc=0, column='commodity', value=name)
        
    else:
        
        # Create a temporary DataFrame
        temp_df = pd.read_html(url_one)[0]
        # Attach the commodity name column to the temporary dataframe
        temp_df.insert(loc=0, column='commodity', value=name)
        # Concatenate temporary DataFrame to the container DataFrame
        container_df = pd.concat([container_df, temp_df], ignore_index=True)

ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

In [None]:
# Attach the date column to the dataframe
container_df.insert(loc=0, column='Date', value=date)

In [None]:
container_df.head()

In [None]:
# The container_df must be transfered to container_raw sql database.
# Rename the column names in DataFrame to match column names in SQL table
container_df.columns = tables.Container.__table__.columns.keys()[1:] # Exclude the id column

In [None]:
container_df.head()

In [None]:
# Transfer dataframe to sql table
container_df.to_sql('container_raw', con=engine, index=False, if_exists='append')

## product_combination_raw update

In [None]:
for index, (name, value) in enumerate(zip(commodity_names, commodity_values)):
    # URL for container information which will update value for each commodity
    url_two = f'http://www.joburgmarket.co.za/dailyprices.php?commodity={value}&containerall=2'
    
    # Initially create a DataFrame 
    if not index:
        
        product_combo_df = pd.read_html(url_two)[0]
        # Attach the commodity name column to the product_combo dataframe
        product_combo_df.insert(loc=0, column='commodity', value=name)
        
    else:
        
        # Create a temporary DataFrame
        temp_df = pd.read_html(url_two)[0]
        # Attach the commodity name column to the temporary dataframe
        temp_df.insert(loc=0, column='commodity', value=name)
        # Concatenate temporary DataFrame to the product_combo DataFrame
        product_combo_df = pd.concat([product_combo_df, temp_df], ignore_index=True)

In [None]:
product_combo_df.tail()

In [None]:
# Attach the date column to the dataframe
product_combo_df.insert(loc=0, column='Date', value=date)

In [None]:
product_combo_df.head()

In [None]:
# The container_df must be transfered to container_raw sql database.
# Rename the column names in DataFrame to match column names in SQL table
product_combo_df.columns = tables.ProductCombination.__table__.columns.keys()[1:] # Exclude the id column

In [None]:
product_combo_df.head()

In [None]:
# Transfer dataframe to sql table
product_combo_df.to_sql('product_combination_raw', con=engine, index=False, if_exists='append')

In [None]:
connection.close()