In [None]:
!pip install pyspark
!pip install --upgrade google-cloud-bigtable

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Web Content Indexing").getOrCreate()
json_df = spark.read.json("part-r-00000")
json_df.show()

+--------------------+--------------------+--------------------+-----------------+-----------------+--------------------+------------+--------------------+----------+-------------+--------------------+
|           fetchTime|            metadata|        modifiedTime|retriesSinceFetch|retryIntervalDays|retryIntervalSeconds|       score|           signature|statusCode|   statusName|                 url|
+--------------------+--------------------+--------------------+-----------------+-----------------+--------------------+------------+--------------------+----------+-------------+--------------------+
|Mon Nov 13 21:20:...|{NULL, NULL, NULL...|Thu Jan 01 07:30:...|                0|               30|             2592000|4.0064103E-4|                null|         1| db_unfetched|  http://www.3u.com/|
|Wed Dec 13 21:00:...|{text/html, temp_...|Mon Nov 13 21:00:...|                0|               30|             2592000| 0.106155306|                null|         4|db_redir_temp|   https://a

In [None]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window

json_df = json_df.dropDuplicates().na.drop()

tokenizer = Tokenizer(inputCol="url", outputCol="keywords")
json_df = tokenizer.transform(json_df)

windowSpec = Window.orderBy(col("score").desc())
ranked_json_df = json_df.withColumn("rank", rank().over(windowSpec))

indexed_json_df = ranked_json_df.select("url", "keywords", "rank")
indexed_json_df.write.parquet("output")

In [None]:
parquet_df = spark.read.parquet("output/part-00000-7170e01f-c3fb-47a4-b6c9-9e7cce45d50a-c000.snappy.parquet")
parquet_df.show()

In [None]:
import os
from google.cloud import bigtable
from google.oauth2 import service_account

project_id = "web-content-indexing"
instance_id = "data-engineering-project"
table_id = "crawled-links"
parquet_file = "output/part-00000-7170e01f-c3fb-47a4-b6c9-9e7cce45d50a-c000.snappy.parquet"

# Set the path to your service account JSON key file
credentials_path = "web-content-indexing-78a6a13bc8de.json"

# Set the GOOGLE_APPLICATION_CREDENTIALS environment variable
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

# Create a Cloud Bigtable client using the service account credentials
client = bigtable.Client(project=project_id,
                         credentials=service_account.Credentials.from_service_account_file(credentials_path))

# Connect to an existing Cloud Bigtable instance
instance = client.instance(instance_id)

# Open an existing table or create a new one
table = instance.table(table_id)

# Read the Parquet file into a Spark DataFrame
parquet_df = spark.read.parquet(parquet_file)

# Convert the DataFrame to a Pandas DataFrame for easy iteration
pandas_df = parquet_df.toPandas()

# Iterate over rows and write to Bigtable
for index, row in pandas_df.iterrows():
    row_key = str(row['url'])
    column_family_id = "cf1"
    column_id = "c1"

    # Extract 'keywords' column value from the Pandas DataFrame
    keywords_value = str(row['keywords'])

    # Create a new row
    row_key_bytes = row_key.encode("utf-8")
    row = table.row(row_key_bytes)

    # Add the column value to the row
    row.set_cell(column_family_id, column_id, keywords_value.encode("utf-8"))  # Encode keywords_value to bytes

    # Apply the mutation to the table
    table.mutate_rows([row])

    print(f"Row with key '{row_key}' written to Bigtable.")

In [None]:
import os
import pandas as pd
from google.cloud import bigtable
from google.oauth2 import service_account
from google.cloud.bigtable.row_set import RowSet

project_id = "web-content-indexing"
instance_id = "data-engineering-project"
table_id = "crawled-links"

# Set the path to your service account JSON key file
credentials_path = "web-content-indexing-78a6a13bc8de.json"

# Set the GOOGLE_APPLICATION_CREDENTIALS environment variable
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

# Create a Cloud Bigtable client using the service account credentials
client = bigtable.Client(project=project_id,
                         credentials=service_account.Credentials.from_service_account_file(credentials_path))

# Connect to an existing Cloud Bigtable instance
instance = client.instance(instance_id)

# Open an existing table
table = instance.table(table_id)

# Read data from Bigtable
rows = table.read_rows()

# Create an empty list to store the data
data = []

# Iterate over rows and extract data
for row in rows:
    # Extract row key
    row_key = row.row_key.decode("utf-8")

    # Extract column family and column values
    columns_data = {}
    for family, columns in row.cells.items():
        for column, cells in columns.items():
            value = cells[0].value.decode("utf-8")
            columns_data[f"{family}:{column}"] = value

    # Create a dictionary representing the row
    row_data = {"RowKey": row_key, **columns_data}

    # Append the row data to the list
    data.append(row_data)

# Create a Pandas DataFrame from the data
df = pd.DataFrame(data)

# Display the DataFrame in Jupyter
df.head()

In [None]:
spark.stop()