# Data discovery through Bigquery


In [1]:
'''
    WARNING CONTROL to display or ignore all warnings
'''
import warnings; warnings.simplefilter('ignore')     #switch betweeb 'default' and 'ignore'
import traceback

''' Set debug flag to view extended error messages; else set it to False to turn off debugging mode '''
debug = True


## Instantiate Classes

In [2]:
import os
import sys

proj_dir = os.path.abspath(os.pardir)
sys.path.insert(1,proj_dir.split('wrangler/')[0])
from rezaware.modules.etl.loader import sparkRDBM as db

''' restart initiate classes '''
if debug:
    import importlib
    db = importlib.reload(db)

_jar = os.path.join(os.getenv('SPARK_HOME'),
                    'jars',
                    'spark-3.4-bigquery-0.41.1.jar')

__desc__ = "read and write files from and to BigQuery database"
clsSDB = db.dataWorkLoads(
    desc=__desc__,
    db_type = 'BigQuery',
    db_driver=None,
    db_hostIP=None,
    db_port = None,
    db_name = None,
    db_schema='facebook_ads_combined_data',
    spark_partitions=None,
    spark_format = 'bigquery',
    spark_save_mode=None,
    # spark_jar_dir = _jar,
)
if clsSDB.session:
    clsSDB._session.stop
print("\n%s class initialization and load complete!" % __desc__)

All functional __PROPATTR__-libraries in LOADER-package of ETL-module imported successfully!
All functional SPARKRDBM-libraries in LOADER-package of ETL-module imported successfully!
All functional SPARKRDBM-libraries in LOADER-package of ETL-module imported successfully!
All functional APP-libraries in REZAWARE-package of REZAWARE-module imported successfully!
__propAttr__ Class initialization complete


25/01/01 13:48:07 WARN Utils: Your hostname, Waidy-Think-Three resolves to a loopback address: 127.0.1.1; using 192.168.1.100 instead (on interface wlp3s0)
25/01/01 13:48:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/01/01 13:48:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).



read and write files from and to BigQuery database class initialization and load complete!


## Load data

In [3]:
# query = """
# SELECT *
# FROM `fresh-deck-445306-b0.facebook_ads_combined_data.combine__metadata_ad_account`
# LIMIT 1000
# """
query = """
SELECT 
    c.id AS campaign_id,
    c.name AS campaign_name,
    c.objective AS campaign_objective,
    c.daily_budget AS campaign_daily_budget,
    c.lifetime_budget AS campaign_lifetime_budget,
    a.id AS adset_id,
    a.name AS adset_name,
    a.daily_budget AS adset_budget,
    a.targeting AS adset_targeting,
    ad.id AS ad_id,
    ad.name AS ad_name,
    ac.object_type AS creative_type,
    ac.object_url  AS creatives_url,        
    ac.name AS creatives_name,
    ac.title AS creatives_title,
    CAST(i.impressions AS INT64) AS impressions,
    CAST(i.clicks AS INT64) AS  clicks,
    i.estimated_conversions AS estimated_conversions,
    i.spend AS spend ,
    ROUND(CAST(i.clicks AS INT64) / NULLIF(CAST(i.impressions AS INT64), 0), 4) AS ctr,
    ROUND(CAST(i.spend  AS FLOAT64)/ NULLIF(CAST(i.estimated_conversions  AS INT64), 0), 2) AS est_cpa
FROM 
    `fresh-deck-445306-b0.combined_data_facebook_ads.combine__metadata_campaign` c
JOIN 
   `fresh-deck-445306-b0.combined_data_facebook_ads.combine__metadata_adset` a
ON 
    c.id = a.campaign_id
JOIN 
    `fresh-deck-445306-b0.combined_data_facebook_ads.combine__metadata_ad`ad
ON 
    a.id = ad.adset_id
JOIN 
    `fresh-deck-445306-b0.combined_data_facebook_ads.combine__metadata_ad_creatives` ac
ON 
    JSON_EXTRACT_SCALAR(ad.creative, "$.id") = ac.id
JOIN 
    `fresh-deck-445306-b0.combined_data_facebook_ads.cl_combine_airbyte_raw_act__ads_insights` i
ON 
    ad.id = i.ad_id
LIMIT 1000
"""
sdf = clsSDB.read_data_from_table(
    select = query,
    # db_table="",
    # db_column="",
    # lower_bound=None,
    # upper_bound=None,
)
print("Loaded %d rows" % sdf.count())
sdf.printSchema()

[Error]sparkRDBM function <read_data_from_table> Some of types cannot be determined after inferring


AttributeError: 'NoneType' object has no attribute 'count'

In [4]:
sdf.show(n=5)

+-----------------+----------+--------------------+-----------------+--------------------+---------+--------------------+
|         adset_id|adset_name|     adset_targeting|            ad_id|             ad_name|ad_status|            creative|
+-----------------+----------+--------------------+-----------------+--------------------+---------+--------------------+
|23849492440140708|        WW|{"age_max": 65, "...|23849492440160708|  Tranceportal Radio|   ACTIVE|{"id": "238494924...|
|23849500590990708|        WW|{"age_max": 65, "...|23849500591010708|interview with sc...|   ACTIVE|{"id": "238495005...|
|23849542473360708|        WW|{"age_max": 65, "...|23849542473370708|       Techno Viking|   ACTIVE|{"id": "238495424...|
|23849635080550708|        WW|{"age_max": 65, "...|23849635080560708|Tranceportal Radi...|   ACTIVE|{"id": "238496350...|
|23849650474420708|        WW|{"age_max": 65, "...|23849650474400708|David Forbes - Ce...|   ACTIVE|{"id": "238496504...|
+-----------------+-----

## Profiling

In [4]:
from ydata_profiling import ProfileReport
_cols = sdf.columns
# _cols = ["ad_name","type","status","name","post","body","thumbnail_url","post_1"]
rpt_sdf_ = sdf.select(*_cols)

_fname = "combine_all_with_aggregates"
report = ProfileReport(
    rpt_sdf_, 
    title=f"Profiling {_fname}",
    infer_dtypes=False,
    interactions=None,
    missing_diagrams=None,
    correlations={
        "auto": {"calculate": True},
        "pearson": {"calculate": False},
        "spearman": {"calculate": False},
        "phi_k" : {"calculate": False},
    },
)
_fpath = os.path.join(proj_dir.split('wrangler/')[0],'wrangler/data/budget/etl/profiling',_fname)
report.to_file(_fpath)

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

25/01/01 13:36:17 WARN CacheManager: Asked to cache already cached data.        
25/01/01 13:36:20 WARN CacheManager: Asked to cache already cached data.        
25/01/01 13:36:37 WARN CacheManager: Asked to cache already cached data.        
25/01/01 13:36:38 WARN CacheManager: Asked to cache already cached data.
25/01/01 13:36:38 WARN CacheManager: Asked to cache already cached data.
25/01/01 13:36:38 WARN CacheManager: Asked to cache already cached data.
25/01/01 13:36:38 WARN CacheManager: Asked to cache already cached data.
25/01/01 13:36:39 WARN CacheManager: Asked to cache already cached data.        
25/01/01 13:36:39 WARN CacheManager: Asked to cache already cached data.
25/01/01 13:36:39 WARN CacheManager: Asked to cache already cached data.
25/01/01 13:36:40 WARN CacheManager: Asked to cache already cached data.
25/01/01 13:36:40 WARN CacheManager: Asked to cache already cached data.
                                                                                

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

## Null Counts

In [2]:
import os
from pyspark.sql import SparkSession

# Set credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/nuwan/.config/gcloud/legacy_credentials/samana.thetha@gmail.com/adc.json"

# Create Spark session with specific JAR configurations
# spark = SparkSession.builder \
#     .appName("bigquery-test") \
#     .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.32.2") \
#     .config("spark.driver.extraClassPath", "/opt/spark_hadoop_3/jars/*") \
#     .config("spark.executor.extraClassPath", "/opt/spark_hadoop_3/jars/*") \
#     .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
#     .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", 
#             "/home/nuwan/.config/gcloud/legacy_credentials/samana.thetha@gmail.com/adc.json") \
#     .getOrCreate()
spark = SparkSession.builder \
    .appName("bigquery-test") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.32.2") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", 
            "/home/nuwan/.config/gcloud/legacy_credentials/samana.thetha@gmail.com/adc.json") \
    .getOrCreate()

# Additional configuration
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "facebook_ads_combined_data")

# Read the table
df_spark = spark.read \
    .format("bigquery") \
    .option("project", "fresh-deck-445306-b0") \
    .option("table", "fresh-deck-445306-b0.facebook_ads_combined_data.combine__metadata_ad_account") \
    .load()

# Show the data
df_spark.show(5)

Py4JJavaError: An error occurred while calling o47.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiated
	at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:582)
	at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:804)
	at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:722)
	at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1393)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
	at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
	at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
	at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
	at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:629)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.lang.NoClassDefFoundError: scala/collection/StringOps$
	at com.google.cloud.spark.bigquery.BigQueryUtilScala$.validateScalaVersionCompatibility(BigQueryUtil.scala:37)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.<init>(BigQueryRelationProvider.scala:42)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.<init>(BigQueryRelationProvider.scala:49)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:780)
	... 31 more
