In [None]:
param1='asd'
param2='asd'
param3='asd'

In [1]:
import logging
import requests
import pandas as pd
import numpy as np
import random
import pyarrow as pa
import pyarrow.parquet as pq

wine_cols = ["ID", "Winery","Name", "Vintage","Country", "Region", "Wine_Style", "Wine_Type", "Wine_Category", "Grape_Type",
            "Grape_ID", 'Rating', 'Review_Count','Price', 'Acidity', "Fizziness", "Intensity", "Sweetness", "Tannin", "Scrape_Date"]

#  This way we can run the pipeline daily and grow naturally and randomly
# But lower numbers are more common than higher numbers
# thus we need a higher density among the lower numbers
def generate_numbers(n, max_val):
    min = np.floor(max_val*0.1).astype(int)
    small_numbers = np.random.exponential(scale=1.0, size=np.floor(n*0.5).astype(int))
    small_numbers = np.floor(small_numbers * 0.5*min).astype(int)
    large_numbers = np.random.exponential(scale=1.0, size=np.floor(n*0.5).astype(int))
    large_numbers = np.floor(large_numbers * 0.5*max_val).astype(int)
    numbers = np.concatenate((small_numbers, large_numbers))
    return list(numbers)

random_grapes = generate_numbers(20, 1500)

temp_df = []

number_of_pages = 10

for y in random_grapes: #y range is the number of grape types (up to 200)  
    for z in [1,2,3,4,7,24]: # z is the wine type (1: red, 2: white, 3: sparkling, 4: rosé 7: dessert wine 24: fortified wine) 
        logging.info(f"Scraping grape {y} and wine type {z}")
        for x in range(1, number_of_pages): # x range is the number of pages (up to ?? - depends on grape)  
            # instead of parsing we found a somewhat unofficial API that we can use to get the data
            # But normally one would only get 2000 results (https://stackoverflow.com/questions/71264253/web-scraping-vivino-using-python)
            # thats why we analyzed all the data one can use as payload to design restarts for the random walk of API scraping
            
            r = requests.get(
            "https://www.vivino.com/api/explore/explore",
            params = {
                #"country_code": "en",
                'grape_ids[]':y,
                #"country_codes[]":["pt", "es", "fr", "de"],
                "currency_code":"EUR",
                #"grape_filter":"varietal",
                "min_rating":"1",
                #"order_by":"price", #  "ratings_average"
                #"order":"asc",
                "page": x,
                "price_range_max":"1500",
                "price_range_min":"0",
                "wine_type_ids[]":z,
                "language":"en",
                "per_page":50
            },
                headers= {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:66.0) Gecko/20100101 Firefox/66.0",  
                'Accept': 'application/json',
                'Accept-Language': 'en-US,en;q=0.5',
            }

            )
            try:
                    
                results = [
                    (
                    f'{t["vintage"]["wine"]["name"]} {t["vintage"]["year"]}',#ID
                    t["vintage"]["wine"]["winery"]["name"], #winery
                    t["vintage"]["wine"]["name"], #Name
                    t["vintage"]["year"], #Vintage
                    t["vintage"]["wine"]["region"]["country"]["name"], #Country
                    t["vintage"]["wine"]["region"]["name"], #region
                    t["vintage"]["wine"]["style"]["seo_name"], # wine style
                    t["vintage"]["wine"]["style"]["varietal_name"], # wine type
                    t["vintage"]["wine"]["type_id"], #wine type by id
                    r.json()["selected_filters"][0]["items"][0]["name"], # grape type
                    r.json()["selected_filters"][0]["items"][0]["id"], # grape id
                    t["vintage"]["statistics"]["ratings_average"], #rating
                    t["vintage"]["statistics"]["ratings_count"],# number of ratings
                    t["price"]["amount"],#price
                    t["vintage"]["wine"]["taste"]["structure"]["acidity"], # wine dimensions 1
                    t["vintage"]["wine"]["taste"]["structure"]["fizziness"],# wine dimensions 2
                    t["vintage"]["wine"]["taste"]["structure"]["intensity"], # wine dimensions 3
                    t["vintage"]["wine"]["taste"]["structure"]["sweetness"],# wine dimensions 4
                    t["vintage"]["wine"]["taste"]["structure"]["tannin"],    # wine dimensions 5
                    # add scrape date as date
                    pd.to_datetime('today').strftime("%d-%m-%Y")
                    )
                    for t in r.json()["explore_vintage"]["matches"]
                    ]
                temp_df.append(results)
            except:
                    pass

if all(isinstance(i, list) for i in temp_df):
    temp_df = [item for sublist in temp_df for item in sublist]  # Flatten the list of lists
    wine_df = pd.DataFrame(temp_df, columns=wine_cols)


In [2]:
wine_df
logging.info(f"Scraped {len(wine_df)} wines")

Unnamed: 0,ID,Winery,Name,Vintage,Country,Region,Wine_Style,Wine_Type,Wine_Category,Grape_Type,Grape_ID,Rating,Review_Count,Price,Acidity,Fizziness,Intensity,Sweetness,Tannin,Scrape_Date
0,The Laird 2016,Torbreck,The Laird,2016,Australia,Barossa Valley,australian-barossa-valley-shiraz,Shiraz,1,Shiraz/Syrah,1,4.8,45,700.00,2.922436,,4.671737,2.431122,3.093863,26-11-2023
1,The Eye of Rã 2016,Glaetzer,The Eye of Rã,2016,Australia,Barossa Valley,australian-barossa-valley-shiraz,Shiraz,1,Shiraz/Syrah,1,0.0,19,434.50,3.249580,,4.549021,1.519860,3.451678,26-11-2023
2,Distenta Syrah 2020,Sine Qua Non,Distenta Syrah,2020,United States,California,californian-syrah,Syrah,1,Shiraz/Syrah,1,4.8,30,350.00,3.084545,,4.482545,1.727454,3.663091,26-11-2023
3,Ligne de Crete Les Grandes Vignes Hermitage 2018,Delas,Ligne de Crete Les Grandes Vignes Hermitage,2018,France,Hermitage,northern-rhone-hermitage,Hermitage,1,Shiraz/Syrah,1,4.8,25,204.70,3.648322,,4.087517,1.631208,4.212551,26-11-2023
4,Harlequin 2016,Zýmē,Harlequin,2016,Italy,Veneto,northern-italy-red,Red,1,Shiraz/Syrah,1,0.0,23,289.98,2.657180,,4.083442,2.438686,3.081630,26-11-2023
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7445,Eulalie 2018,Château de Cénac,Eulalie,2018,France,Cahors,southwest-france-malbec,Malbec,1,Malbec,9,0.0,1,21.00,2.974397,,3.950228,1.250489,3.340130,26-11-2023
7446,The Apple Doesn't Fall Far From The Tree Malbe...,Matías Riccitelli,The Apple Doesn't Fall Far From The Tree Malbec,2020,Argentina,Mendoza,argentinian-mendoza-malbec-red,Malbec,1,Malbec,9,3.9,308,30.55,2.901409,,3.902166,2.289588,2.663687,26-11-2023
7447,Alta Colección Malbec Mendoza 2022,Bodega Piedra Negra,Alta Colección Malbec Mendoza,2022,Argentina,Mendoza,argentinian-mendoza-malbec-red,Malbec,1,Malbec,9,3.9,289,16.87,2.785937,,3.736365,2.153970,2.622693,26-11-2023
7448,Pessac-Léognan (Grand Cru Classé de Graves) 2008,Château Bouscaut,Pessac-Léognan (Grand Cru Classé de Graves),2008,France,Pessac-Léognan,bordeaux-left-bank-pessac-leognan,Pessac-Léognan,1,Malbec,9,3.9,268,37.00,4.257281,,4.302884,1.656766,4.342341,26-11-2023


In [3]:
# map wine type id to wine type
wine_df["Wine_Category"] = wine_df["Wine_Category"].replace({1: "Red", 2: "White", 3: "Sparkling", 4: "Rosé", 7: "Dessert Wine", 24: "Fortified Wine"})

# Remove duplicates
wine_df = wine_df.drop_duplicates(subset=['ID'])

In [4]:
#cleaning region data
wine_df['Region'] = wine_df['Region'].str.replace('Grand Cru', '')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  wine_df['Region'] = wine_df['Region'].str.replace('Grand Cru', '')


In [5]:
#clean NaN values
wine_df['Rating'] = wine_df['Rating'].replace(0, np.nan)
wine_df['Vintage'] = wine_df['Vintage'].replace("N.V.", np.nan)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  wine_df['Rating'] = wine_df['Rating'].replace(0, np.nan)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  wine_df['Vintage'] = wine_df['Vintage'].replace("N.V.", np.nan)


In [6]:
# drop rows of wine_df['Vintage'] with value '' (empty string)
wine_df = wine_df[wine_df['Vintage'] != '']
wine_df['Vintage'] = wine_df['Vintage'].fillna(0)
wine_df['Vintage'].unique()


array([2016, 2020, 2018, 2014, 2015, 2005, 2009, 2019, 2017, 2004, 1998,
       2011, 1990, 2010, 2012, 2013, 2000, 1989, 1999, 2003, 2006, 2007,
       2008, 2021, 2022, 2001, 1987, 1884, 1982, 1995, 1996, 1986, 1997,
       1985, 2002, 1944, 1924, 1919, 1928, 1893, 1994, 1988, 1993, 1983,
       1970,    0, 1948, 1981, 1984, 1991, 1967, 2023, 1954, 1974, 1959,
       1934, 1980, 1964, 1977, 1961, 1966, 1969, 1992, 1953, 1975, 1976,
       1971, 1979, 1978, 1956, 1936, 1965, 1874, 1951, 1968, 1945, 1960,
       1949, 1962, 1955, 1952, 1950, 1947, 1972, 1973, 1946, 1958, 1963])

In [7]:


# redefine column types for parquet (otherwise it will be object)
wine_df['Vintage'] = wine_df['Vintage'].astype(int)
wine_df['Grape_ID'] = wine_df['Grape_ID'].astype(int)
wine_df['Rating'] = wine_df['Rating'].astype(float)
wine_df['Review_Count'] = wine_df['Review_Count'].astype(int)
wine_df['Price'] = wine_df['Price'].astype(float)
wine_df['Acidity'] = wine_df['Acidity'].astype(float)
wine_df['Fizziness'] = wine_df['Fizziness'].astype(float)
wine_df['Intensity'] = wine_df['Intensity'].astype(float)
wine_df['Sweetness'] = wine_df['Sweetness'].astype(float)
wine_df['Tannin'] = wine_df['Tannin'].astype(float)

wine_df.dtypes
logging.info("Transformation done")


ID                object
Winery            object
Name              object
Vintage            int64
Country           object
Region            object
Wine_Style        object
Wine_Type         object
Wine_Category     object
Grape_Type        object
Grape_ID           int64
Rating           float64
Review_Count       int64
Price            float64
Acidity          float64
Fizziness        float64
Intensity        float64
Sweetness        float64
Tannin           float64
Scrape_Date       object
dtype: object

In [8]:
wine_df.to_parquet('/opt/airflow/wine_data.parquet', engine='fastparquet')

ImportError: Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.