In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import udf
import re
from pyspark.sql.functions import col, regexp_replace, lower
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, MapType, StructType, StructField, IntegerType, ArrayType, FloatType,LongType
from pyspark.sql.utils import AnalysisException
import json
from datetime import datetime
import time
from pyspark.sql.functions import lit
from pyspark.sql import Row
from pyspark.sql.functions import monotonically_increasing_id, concat, lit, current_timestamp



In [0]:
start_time = time.time()


In [0]:

storage_account_name = "storagereviews"
container_us = "reviews-us"
container_uk = "reviews-uk"
container_tr = "reviews-tr"
container_fr = "reviews-fr"
container_it = "reviews-it"
container_gr = "reviews-gr"  
container_ind = "reviews-ind"
container_br = "reviews-br"
account_key = ""

spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", account_key)

us_reviews_path = f"wasbs://{container_us}@{storage_account_name}.blob.core.windows.net"
uk_reviews_path = f"wasbs://{container_uk}@{storage_account_name}.blob.core.windows.net"
tr_reviews_path = f"wasbs://{container_tr}@{storage_account_name}.blob.core.windows.net"
fr_reviews_path = f"wasbs://{container_fr}@{storage_account_name}.blob.core.windows.net"
it_reviews_path = f"wasbs://{container_it}@{storage_account_name}.blob.core.windows.net"
gr_reviews_path = f"wasbs://{container_gr}@{storage_account_name}.blob.core.windows.net"
ind_reviews_path = f"wasbs://{container_ind}@{storage_account_name}.blob.core.windows.net"
br_reviews_path = f"wasbs://{container_br}@{storage_account_name}.blob.core.windows.net"



In [0]:
schema = StructType([
    StructField("author", StringType(), True),
    StructField("body", StringType(), True),
    StructField("region", StringType(), True),
    StructField("reviewDate", StringType(), True),
    StructField("title", StringType(), True)
])

def load_json_or_empty(path):
    try:
        df = spark.read.option("mode", "PERMISSIVE").option("multiline", "true").json(path)
        if df.count() == 0:  # Si le DataFrame est vide
            print(f"Aucun fichier trouvé dans {path}, DataFrame vide renvoyé.")
            return spark.createDataFrame([], df.schema)  # Retourner un DataFrame vide avec la même structure
        return df
    except AnalysisException:
        print(f"Le conteneur {path} est vide ou n'existe pas, DataFrame vide renvoyé.")
        return spark.createDataFrame([], schema)  # Retourner un DataFrame vide avec un schéma de base

# Charger les JSONs de chaque conteneur si disponible
df_us = load_json_or_empty(us_reviews_path)
df_uk = load_json_or_empty(uk_reviews_path)
df_tr = load_json_or_empty(tr_reviews_path)
df_fr = load_json_or_empty(fr_reviews_path)
df_it = load_json_or_empty(it_reviews_path)
df_gr = load_json_or_empty(gr_reviews_path)
df_ind = load_json_or_empty(ind_reviews_path)
df_br = load_json_or_empty(br_reviews_path)


In [0]:
df_us = df_us.withColumn("region", when(df_us["region"] == "Région non trouvée", "The United States").otherwise(df_us["region"]))
df_uk = df_uk.withColumn("region", when(df_uk["region"] == "Région non trouvée", "United Kingdom").otherwise(df_uk["region"]))
df_tr = df_tr.withColumn("region", when(df_tr["region"] == "Région non trouvée", "Turkey").otherwise(df_tr["region"]))
df_fr = df_fr.withColumn("region", when(df_fr["region"] == "Région non trouvée", "France").otherwise(df_fr["region"]))
df_it = df_it.withColumn("region", when(df_it["region"] == "Région non trouvée", "Italy").otherwise(df_it["region"]))
df_gr = df_gr.withColumn("region", when(df_gr["region"] == "Région non trouvée", "Germany").otherwise(df_gr["region"]))
df_ind = df_ind.withColumn("region", when(df_ind["region"] == "Région non trouvée", "India").otherwise(df_ind["region"]))
df_br = df_br.withColumn("region", when(df_br["region"] == "Région non trouvée", "Brazil").otherwise(df_br["region"]))



In [0]:
df_all = df_us.union(df_uk).union(df_tr).union(df_fr).union(df_it).union(df_gr).union(df_ind).union(df_br)


In [0]:
def check_duplicates(df):
    total_count = df.count()
    unique_count = df.distinct().count()
    if total_count != unique_count:
        raise ValueError("Failed: Duplicates found in the dataset")
    return {"duplicates": total_count - unique_count}


def check_missing_values(df, columns):
    missing_values_report = {}
    for col in columns:
        missing_count = df.filter(df[col].isNull()).count()
        missing_values_report[col] = missing_count
        if missing_count > 0:
            raise ValueError(f"Failed: Missing values in column {col}")
    return missing_values_report

def check_data_types(df, columns_types):
    data_type_report = {}
    for col, expected_type in columns_types.items():
        actual_type = df.schema[col].dataType
        if not isinstance(actual_type, expected_type):
            raise ValueError(f"Failed: Invalid data type in column {col}")
        data_type_report[col] = str(actual_type)
    return data_type_report

def check_rating_range(df, column):
    invalid_ratings = df.filter((df[column] < 1) | (df[column] > 5)).count()
    if invalid_ratings > 0:
        raise ValueError(f"Failed: Invalid values in rating column")
    return {"invalid_ratings": invalid_ratings}


def generate_quality_report(missing_values,duplicates_report, data_type_after_extraction_report,data_type_after_transformation_report, rating_report):
    report = {
        "missing_values": missing_values,
        "duplicates": duplicates_report,
        "data_types_one": data_type_after_extraction_report,
        "data_types_two": data_type_after_transformation_report,
        "rating": rating_report,
        "status": "success"
    }
    return json.dumps(report, indent=4)



In [0]:
expected_types = {
    "author": StringType,
    "body": StringType,
    "region": StringType,
    "reviewDate": StringType,
    "title": StringType
}
try:
    data_type_after_extraction_report = check_data_types(df_all, expected_types)
except ValueError as e:
    print(f"Data quality test failed: {str(e)}")
    # Exit with error for ADF
    raise

In [0]:
df_all= df_all.dropDuplicates()


In [0]:
try:
    duplicates_report = check_duplicates(df_all)
except ValueError as e:
    # Log error message
    print(f"Data quality test failed: {str(e)}")
    # Exit with error for ADF
    raise


In [0]:
# Drop rows where 'body' is missing (null)
df_all= df_all.na.drop(subset=["body"])
df_all= df_all.na.drop(subset=["title"])

In [0]:
columns_to_check = ['body','title'] 
try:
    missing_values_report = check_missing_values(df_all, columns_to_check)

except ValueError as e:
    # Log error message
    print(f"Data quality test failed: {str(e)}")
    # Exit with error for ADF
    raise


In [0]:


# UDF to extract all numbers from the string and return the smallest one, excluding zero
def extract_rating(title):
    # Extract all numbers from the string (both integers and decimals)
    numbers = re.findall(r'\d+\.\d+|\d+', title)  # This will capture both integers and decimals (e.g., 5, 5.0)
    
    if numbers:
        # Convert extracted numbers to float and filter out zeros
        filtered_numbers = [float(num) for num in numbers if float(num) > 0]

        if filtered_numbers:
            # Return the smallest number
            return min(filtered_numbers)
    return None  # If no valid rating found, return None

extract_rating_udf = udf(extract_rating, FloatType())


df_all = df_all.withColumn(
    "rating", 
    extract_rating_udf(df_all["title"]).cast(StringType())  
)


In [0]:
try:
    rating_report = check_rating_range(df_all, 'rating')
except ValueError as e:
    # Log error message
    print(f"Data quality test failed: {str(e)}")
    # Exit with error for ADF 
    dbutils.notebook.exit(f'{{"status": "failed", "reason": "{str(e)}"}}')  

In [0]:
container="data-quality-reports"
max_id_file_path = f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/last_max_id.txt"
try:
    max_id_content = dbutils.fs.head(max_id_file_path
    )
    last_max_id = int(max_id_content)
    print(f"Reprise avec l'ID {last_max_id}")
except Exception as e:
    last_max_id = 0
    print(f"Aucun ID trouvé, démarrage à 0. Erreur: {e}")

df_all = df_all.withColumn("unique_id", monotonically_increasing_id() + last_max_id)

df_all.show()

new_max_id = df_all.agg({"unique_id": "max"}).collect()[0][0]
dbutils.fs.put(max_id_file_path, str(new_max_id), overwrite=True)



Reprise avec l'ID 1018
+--------------------+--------------------+-----------------+------------------+------------------+------+---------+
|              author|                body|           region|        reviewDate|             title|rating|unique_id|
+--------------------+--------------------+-----------------+------------------+------------------+------+---------+
|  Todd A. Gengenbach|This phone was so...|the United States|  December 3, 2024|5.0 out of 5 stars|   5.0|     1018|
|              Nicole|I recently purcha...|the United States|   August 19, 2024|5.0 out of 5 stars|   5.0|     1019|
|          Cori Mills|Great shape. Good...|the United States|  October 14, 2024|5.0 out of 5 stars|   5.0|     1020|
|  Nathan E. Procknow|        Great seller|the United States| November 13, 2024|5.0 out of 5 stars|   5.0|     1021|
|        Carlos Ochoa|El teléfono es ex...|the United States|      May 22, 2024|5.0 out of 5 stars|   5.0|     1022|
|            Madeline|Upgraded from my ..

In [0]:
%pip install googletrans==4.0.0-rc1

Collecting googletrans==4.0.0-rc1
  Downloading googletrans-4.0.0rc1.tar.gz (20 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting httpx==0.13.3 (from googletrans==4.0.0-rc1)
  Obtaining dependency information for httpx==0.13.3 from https://files.pythonhosted.org/packages/54/b4/698b284c6aed4d7c2b4fe3ba5df1fcf6093612423797e76fbb24890dd22f/httpx-0.13.3-py3-none-any.whl.metadata
  Downloading httpx-0.13.3-py3-none-any.whl.metadata (25 kB)
Collecting hstspreload (from httpx==0.13.3->googletrans==4.0.0-rc1)
  Obtaining dependency information for hstspreload from https://files.pythonhosted.org/packages/38/be/095f4109e70cb2f66b8136ee01fd4380672560b521a64db2fd0e7d44c731/hstspreload-2024.12.1-py3-none-any.whl.metadata
  Downloading hstspreload-2024.12.1-py3-none-any.whl.metadata (2.1 kB)
Collecting sniffio (from httpx==0.13.3->googletrans==4.0.0-rc1)
  Obtaining dependency information for sniffio from https://files.pythonhosted.o

In [0]:
from googletrans import Translator

def translate_review(review):
    translator = Translator()
    
    if review:
        try:
            # Detect if the review is in English
            detected_lang = translator.detect(review).lang
            if detected_lang != 'en':
                # Translate to English if the detected language is not English
                translated = translator.translate(review, src=detected_lang, dest='en').text
                return translated
            else:
                return review
        except Exception as e:
            # If translation fails, return the original review and print the error
            print(f"Error translating review: {e}")
            return review
    return None

translate_review_udf = udf(translate_review, StringType())

df_all= df_all.withColumn("cleaned_reviews", translate_review_udf(df_all["body"]))



In [0]:

# Remove non-alphabet characters and convert text to lowercase
df_all = df_all.withColumn(
    "cleaned_reviews", 
    lower(regexp_replace(col("cleaned_reviews"), "[^a-zA-Z\\s]", ""))  # Remove non-alphabet characters and convert to lowercase
)




In [0]:
%pip install transformers

Collecting transformers
  Obtaining dependency information for transformers from https://files.pythonhosted.org/packages/f2/3a/8bdab26e09c5a242182b7ba9152e216d5ab4ae2d78c4298eb4872549cd35/transformers-4.47.1-py3-none-any.whl.metadata
  Downloading transformers-4.47.1-py3-none-any.whl.metadata (44 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/44.1 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.1/44.1 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.24.0 (from transformers)
  Obtaining dependency information for huggingface-hub<1.0,>=0.24.0 from https://files.pythonhosted.org/packages/61/8c/fbdc0a88a622d9fa54e132d7bf3ee03ec602758658a2db5b339a65be2cfe/huggingface_hub-0.27.0-py3-none-any.whl.metadata
  Downloading huggingface_hub-0.27.0-py3-none-any.whl.metadata (13 kB)
Collecting regex!=2019.12.17 (from transformers)
  Obtaining dependency information for regex!=2019.12.1

In [0]:
%pip install torch


Collecting torch
  Obtaining dependency information for torch from https://files.pythonhosted.org/packages/d1/35/e8b2daf02ce933e4518e6f5682c72fd0ed66c15910ea1fb4168f442b71c4/torch-2.5.1-cp311-cp311-manylinux1_x86_64.whl.metadata
  Downloading torch-2.5.1-cp311-cp311-manylinux1_x86_64.whl.metadata (28 kB)
Collecting networkx (from torch)
  Obtaining dependency information for networkx from https://files.pythonhosted.org/packages/b9/54/dd730b32ea14ea797530a4479b2ed46a6fb250f682a9cfb997e968bf0261/networkx-3.4.2-py3-none-any.whl.metadata
  Downloading networkx-3.4.2-py3-none-any.whl.metadata (6.3 kB)
Collecting jinja2 (from torch)
  Obtaining dependency information for jinja2 from https://files.pythonhosted.org/packages/bd/0f/2ba5fbcd631e3e88689309dbe978c5769e883e4b84ebfe7da30b43275c5a/jinja2-3.1.5-py3-none-any.whl.metadata
  Downloading jinja2-3.1.5-py3-none-any.whl.metadata (2.6 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Obtaining dependency information for nvidia-cud

In [0]:
%pip install vaderSentiment


Collecting vaderSentiment
  Obtaining dependency information for vaderSentiment from https://files.pythonhosted.org/packages/76/fc/310e16254683c1ed35eeb97386986d6c00bc29df17ce280aed64d55537e9/vaderSentiment-3.3.2-py2.py3-none-any.whl.metadata
  Downloading vaderSentiment-3.3.2-py2.py3-none-any.whl.metadata (572 bytes)
Downloading vaderSentiment-3.3.2-py2.py3-none-any.whl (125 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/126.0 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m126.0/126.0 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: vaderSentiment
Successfully installed vaderSentiment-3.3.2
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:


from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer


sia = SentimentIntensityAnalyzer()

def get_sentiment(text):
    if text is None:  # Handle cases where the text is None
        return 'Neutral'
    sentiment_score = sia.polarity_scores(text)
    if sentiment_score['compound'] >= 0.05:
        return 'Positive'
    elif sentiment_score['compound'] <= -0.05:
        return 'Negative'
    else:
        return 'Neutral'

sentiment_udf_vader = udf(get_sentiment, StringType())

# Apply the VADER sentiment analysis to the 'body' column
df_all = df_all.withColumn("sentiment", sentiment_udf_vader(df_all["cleaned_reviews"]))


In [0]:

from transformers import pipeline

# Initialize the sentiment analysis pipeline from Hugging Face 
sentiment_analyzer = pipeline("sentiment-analysis")

# Define feature keywords
features_keywords = {
    "camera": ["camera", "photos", "picture quality", "lens", "zoom", "shot"],
    "battery": ["battery", "battery life", "charge", "charging", "power", "drains", "recharge"],
    "screen": ["screen", "display", "touch", "resolution", "size", "pixels"],
    "performance": ["performance", "processor", "speed", "lag", "freeze", "slow", "smooth"],
    "design": ["design", "look", "feel", "build quality", "aesthetic", "style", "color", "material"],
    "price": ["price", "expensive", "value", "cost", "cheap", "affordable", "worth"],
    "software": ["software", "OS", "operating system", "update", "bug", "interface", "UI", "UX"],
    "support": ["support", "customer service", "help", "assistance", "response time", "issue", "problem"],
    "durability": ["durability", "sturdy", "break", "crack", "resilience", "lasting", "damage", "tough"],
    "connectivity": ["Wi-Fi", "Bluetooth", "connection", "signal", "network", "drop", "disconnect"],
    "charging": ["charging time", "fast charge", "charge speed", "slow charge"],
    "ease_of_use": ["easy", "simple", "intuitive", "user-friendly", "complicated"],
    "features": ["features", "functionality", "options", "useful", "customizable"],
    "speed": ["speed", "fast", "slow", "lag", "responsive"],
    "sound": ["sound", "audio", "bass", "clarity", "volume", "speakers", "headphones"],
    "packaging": ["packaging", "box", "unboxing", "presentation"],
    "size_weight": ["size", "weight", "light", "heavy", "compact", "bulky"],
    "security": ["security", "privacy", "fingerprint", "face ID", "data", "encryption"],
    "compatibility": ["compatible", "sync", "integration", "cross-platform"],
    "brightness": ["brightness", "light", "visibility", "contrast", "night mode"],
}


# Function to split sentences on conjunctions and other separators
def split_review_by_conjunction(review):
    if review is None:  # Check if review is None
        return []  # Return an empty list if review is None
    # Split by common conjunctions or separators (e.g., 'but', 'and', commas)
    separators = r'[,.!?;]|\b(but|and|so|because|although)\b'
    # Filter out empty sentences after splitting and stripping
    return [sentence.strip() for sentence in re.split(separators, review) if sentence and sentence.strip() != ""]

# Function to perform sentiment analysis
def analyze_sentiment(sentence):
    sentiment = sentiment_analyzer(sentence)[0]['label']
    return sentiment

sentiment_udf = udf(analyze_sentiment, StringType())

# Function to extract feature-related sentences and analyze sentiment
def analyze_sentiment_for_review_spark(df, feature_keywords, review_column_name):

    def feature_sentiment_udf(review):
        feature_sentiment = {feature: {'positive': 0, 'negative': 0} for feature in feature_keywords}
        sentences = split_review_by_conjunction(review)
        for sentence in sentences:
            for feature, keywords in feature_keywords.items():
                if any(keyword.lower() in sentence.lower() for keyword in keywords):
                    sentiment = analyze_sentiment(sentence)
                    if sentiment == "POSITIVE":
                        feature_sentiment[feature]['positive'] += 1
                    elif sentiment == "NEGATIVE":
                        feature_sentiment[feature]['negative'] += 1
        return feature_sentiment

    # Register UDF for feature-based sentiment analysis
    feature_sentiment_spark_udf = udf(feature_sentiment_udf, StringType())

    # Apply the feature sentiment analysis function to the review column
    df_sentiment = df.withColumn("feature_sentiment", feature_sentiment_spark_udf(df[review_column_name]))

    return df_sentiment

review_column_name = "cleaned_reviews"  
df_all = analyze_sentiment_for_review_spark(df_all, features_keywords, review_column_name)



No model was supplied, defaulted to distilbert/distilbert-base-uncased-finetuned-sst-2-english and revision 714eb0f (https://huggingface.co/distilbert/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Device set to use cpu


In [0]:

expected_types = {
    "author": StringType,
    "body": StringType,
    "region": StringType,
    "reviewDate": StringType,
    "title": StringType,
    "rating": StringType,
    "unique_id": LongType,
    "cleaned_reviews": StringType,
    "sentiment": StringType,
    "feature_sentiment": StringType,
}


try:
    data_type_after_transformation_report = check_data_types(df_all, expected_types)
except ValueError as e:
    # Log error message
    print(f"Data quality test failed: {str(e)}")
    # Exit with error for ADF
    raise

In [0]:
quality_report = generate_quality_report(missing_values_report, duplicates_report, data_type_after_extraction_report,data_type_after_transformation_report, rating_report)


In [0]:
df = df_all.select("unique_id", "feature_sentiment")

def extract_features_sentiment(feature_sentiment):
    results = []
    for feature, sentiments in feature_sentiment.items():
        if sentiments['positive'] > 0:
            results.append((feature, 'positive'))
        elif sentiments['negative'] > 0:
            results.append((feature, 'negative'))
    return results

extract_features_sentiment_udf = F.udf(extract_features_sentiment, ArrayType(StructType([
    StructField('feature', StringType(), True),
    StructField('sentiment', StringType(), True)
])))

# Apply the UDF to the DataFrame to create a new column with exploded features and sentiment
df_with_exploded_features = df.withColumn("exploded_features", extract_features_sentiment_udf(df["feature_sentiment"]))

# Explode the list of (feature, sentiment) into separate rows
df_exploded = df_with_exploded_features.withColumn("feature_sentiment_exploded", F.explode("exploded_features"))

# Split the struct column into 'feature' and 'sentiment'
df_features = df_exploded.select(
    "unique_id",
    F.col("feature_sentiment_exploded.feature").alias("feature"),
    F.col("feature_sentiment_exploded.sentiment").alias("sentiment")
)

In [0]:
from pyspark.sql.functions import col
df_reviews_final = df_all.select(
    col("unique_id").alias("review_id").cast("string"), 
    col("sentiment").alias("sentiment_global").cast("string"),
    col("region").alias("region").cast("string"),
    col("rating").alias("rating").cast("string")
    )

df_features_final = df_features.select(
    col("unique_id").alias("review_id").cast("string"), 
    col("feature").cast("string"),
    col("sentiment").cast("string")
)

jdbc_url = f"jdbc:sqlserver://server-reviews.database.windows.net:1433;database=reviews_db"
connection_properties = {
    "user" : "admin_db",
    "password" : "",
    "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}




In [0]:
df_reviews_final.write.jdbc(url=jdbc_url, table="reviews", mode="append", properties=connection_properties)

In [0]:
df_features_final.write.jdbc(url=jdbc_url, table="features", mode="append", properties=connection_properties)

In [0]:
processing_duration = time.time() - start_time
processing_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

df_all = df_all.withColumn("processing_date", lit(processing_date))
df_all = df_all.withColumn("processing_duration", lit(processing_duration))


In [0]:
quality_report

'{\n    "missing_values": {\n        "body": 0,\n        "title": 0\n    },\n    "duplicates": {\n        "duplicates": 0\n    },\n    "data_types_one": {\n        "author": "StringType()",\n        "body": "StringType()",\n        "region": "StringType()",\n        "reviewDate": "StringType()",\n        "title": "StringType()"\n    },\n    "data_types_two": {\n        "author": "StringType()",\n        "body": "StringType()",\n        "region": "StringType()",\n        "reviewDate": "StringType()",\n        "title": "StringType()",\n        "rating": "StringType()",\n        "unique_id": "LongType()",\n        "cleaned_reviews": "StringType()",\n        "sentiment": "StringType()",\n        "feature_sentiment": "StringType()"\n    },\n    "rating": {\n        "invalid_ratings": 0\n    },\n    "status": "success"\n}'

In [0]:
container_name = "processed-data"

spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", account_key)

output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/final_reviews_{processing_date.replace(' ', '_').replace(':', '-')}"

df_all.write.parquet(output_path, mode="overwrite")

In [0]:

container_name="data-quality-reports"
report_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/data_quality_report_{processing_date.replace(' ', '_').replace(':', '-')}.json"



dbutils.fs.put(report_path, quality_report, overwrite=True)

Wrote 795 bytes.


True

In [0]:
dbutils.notebook.exit(quality_report)
