In [1]:
import MySQLdb  # MySQL database connector
import pygsheets  # Google Sheets API wrapper
import gspread  # Alternative Google Sheets API wrapper
import pandas as pd  # Pandas for data handling
from pyspark.sql.types import DecimalType, IntegerType  # Spark data types
from pyspark.sql.functions import *  # Import all Spark SQL functions
from pyspark.sql.window import Window  # For window functions
from pyspark.sql.functions import concat, col, lit  # Specific Spark functions
from pyspark.sql import SparkSession, SQLContext  # Spark session and SQL context setup

# Initialize Spark session and SQL context
spark = SparkSession.builder.appName("Category_Blocking").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Establish MySQL database connection
conn = MySQLdb.connect(user="alpha", password="h3!!0D0CT0r@ALPH", 
                        host="campaignsdb.alphonso.tv", database="Campaigns_DB", port=3398)
print("Connection Established")

# Authorize Google Sheets access
gc = pygsheets.authorize(service_file="/home/alpha/notebook/Vatsa/Domain_to_brand/client_secret_new.json")
sheetUrl = "https://docs.google.com/spreadsheets/d/1YIN7zyvB6ylST8b571uFB3rzwXX-XNY8J31bX6OmSpo/edit?usp=sharing"
sh = gc.open_by_url(sheetUrl)

# Read data from the Google Sheet into a Pandas DataFrame
df = pd.DataFrame(sh.worksheet_by_title("Supply Tag Level").get_all_records())
df_old = spark.createDataFrame(df)  # Convert to Spark DataFrame

# Fetch supply tag metadata from MySQL
df_tag = sqlContext.createDataFrame(pd.read_sql_query(
    "SELECT `id`,`supply_tag_name`,`supply_tag_id_3p`,`supply_partner_id`,`active` from prog_supply_tags", conn))

# Fetch supply partner metadata from MySQL
df_partner = sqlContext.createDataFrame(pd.read_sql_query(
    "SELECT * from prog_supply_partners", conn)).select("id", "supply_partner_name", "supply_partner_id_3p")

# Join supply tags with supply partners
df_supply = df_tag.join(df_partner, df_tag.supply_partner_id == df_partner.id, "left")
df_supply = df_supply.filter(col('active') == 1)  # Filter only active supply tags
df_new = df_supply.select("supply_partner_name", "supply_tag_name", "supply_tag_id_3p", "supply_partner_id_3p")

# Identify new supply tags not present in the Google Sheet
df = df_new.join(df_old, df_new["supply_tag_id_3p"] == df_old["Supply Tag ID"], "leftanti")

# Filter out test and unwanted supply tags
df = df.filter(~(
    (col("supply_partner_name").contains("DC")) |
    (col("supply_partner_name").contains("Test")) |
    (col("supply_tag_name").contains("test")) |
    (col("supply_tag_name").contains("Test")) |
    (col("supply_tag_name").contains("SpringServe OM"))
))

# Fetch supply tag label mappings from MySQL
df_tag_label = sqlContext.createDataFrame(pd.read_sql_query(
    "SELECT `supply_tag_id`,`is_deleted`,`supply_label_id` from prog_supply_label_supply_tag", conn))
df_tag_label = df_tag_label.filter(col("is_deleted") == 0)

# Fetch supply tag names
df_tag = sqlContext.createDataFrame(pd.read_sql_query(
    "SELECT `id`,`supply_tag_id_3p`,`supply_tag_name` from prog_supply_tags", conn))

# Fetch supply labels
df_label = sqlContext.createDataFrame(pd.read_sql_query(
    "SELECT `id`,`supply_label_name` from prog_supply_labels", conn))

# Map supply tags to their labels
df_tag = df_tag.join(df_tag_label, df_tag_label.supply_tag_id == df_tag.id, "inner")
df_map = df_label.join(df_tag, df_tag_label.supply_label_id == df_label.id, "inner")
df_map = df_map.select("supply_tag_id_3p", "supply_label_name")

# Add supply labels to the dataframe
df = df.join(df_map, ["supply_tag_id_3p"], "inner")
supply_label_list = df.select('supply_label_name').rdd.flatMap(lambda x: x).collect()

# Convert DataFrame to Pandas DataFrame for Google Sheets update
df = df.select(
    col("supply_partner_id_3p").alias("Supply Partner ID"),
    col("supply_partner_name").alias("Supply Partner"),
    col("supply_tag_id_3p").alias("Supply Tag ID"),
    col("supply_tag_name").alias("Supply Tag")
).toPandas()

# Determine the row position to update in Google Sheets
row_num = df_old.count() + 2
row_str = 'A' + str(row_num)

# Update Google Sheets with new data
try:
    sh.worksheet_by_title("Supply Tag Level").set_dataframe(df, row_str, copy_head=False)
except pygsheets.InvalidArgumentValue:
    pass
except:
    raise Exception

# Re-authorize with gspread for batch updates
gc = gspread.service_account(filename="/home/alpha/notebook/Vatsa/Domain_to_brand/client_secret_new.json")
sh = gc.open_by_url(sheetUrl)

# Prepare boolean data validation for Google Sheets batch update
valueList = []
for i in range(0, df.shape[0]):
    valueListRow = []
    if "O&O" not in supply_label_list[i]:  # Set boolean values for columns based on conditions
        for j in range(4, len(df_old.columns)):
            valueListRow.append({"userEnteredValue": {"boolValue": j == 5}})
    else:
        for j in range(4, len(df_old.columns)):
            valueListRow.append({"userEnteredValue": {"boolValue": j in [5, 8, 11, 12]}})
    valueList.append({"values": valueListRow})

# Define batch update requests for Google Sheets
requests = {"requests": [
    {
        "repeatCell": {
            "cell": {"dataValidation": {"condition": {"type": "BOOLEAN"}}},
            "range": {"sheetId": "0", "startRowIndex": df_old.count()+1, "endRowIndex": df_old.count()+df.shape[0]+1, "startColumnIndex": 4, "endColumnIndex": len(df_old.columns)},
            "fields": "dataValidation"
        }
    },
    {
        "updateCells": {
            "rows": valueList,
            "start": {"rowIndex": df_old.count()+1, "columnIndex": 4, "sheetId": "0"},
            "fields": "userEnteredValue"
        }
    }
]}

# Execute batch update in Google Sheets
sh.batch_update(requests)


Connection Established


{'spreadsheetId': '1YIN7zyvB6ylST8b571uFB3rzwXX-XNY8J31bX6OmSpo',
 'replies': [{}, {}]}