In [3]:
import pandas as pd
from google.cloud import bigquery
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
import json
from pandas import json_normalize
from nltk.sentiment import SentimentIntensityAnalyzer
import nltk

In [2]:
def get_raw_news_from_big_query(table='raw_news', project_id='tomastestproject-433206', dataset='testdb_1') -> pd.DataFrame:
    # Set the path to your service account JSON file

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'tomastestproject-433206-adc5bc090976.json'

    table_id = f"{project_id}.{dataset}.{table}"
    # Create a BigQuery client
    client = bigquery.Client()

    # Build your SQL query
    query = f"""
        SELECT *
        FROM `{table_id}`
    """

    # Execute the SQL query
    query_job = client.query(query)

    # Fetch the results
    results = query_job.result()

    # Convert results to a DataFrame
    df = results.to_dataframe()

    # Check if DataFrame is empty and raise an error if needed
    if df.empty:
        raise ValueError(f"No data found")

    return df

In [85]:
df = get_raw_news_from_big_query()



In [175]:
def clean_news(data: pd.DataFrame) -> pd.DataFrame:

    # Förbered DataFrame
    # Se till att 'data' kolumnen är en lista av artiklar
    df['data'] = df['data'].apply(lambda x: x.get(
        'articles', []) if isinstance(x, dict) else [])

    # Explodera artiklar till separata rader
    df_exploded = df.explode('data')

    # Normalisera JSON-data i 'data' kolumnen
    articles_df = json_normalize(df_exploded['data'])

    # Lägg till övriga kolumner
    # Kombinera normaliserad artikeldata med 'company' kolumnen
    final_df = pd.concat(
        [articles_df, df_exploded[['company']].reset_index(drop=True)], axis=1)

    final_df.drop(columns=['content', 'source.id', 'urlToImage'], inplace=True)

    final_df['publishedAt'] = pd.to_datetime(
        final_df['publishedAt'], format='%Y-%m-%dT%H:%M:%SZ', utc=True)

    final_df.rename(columns={"source.name": "source_name",
                             "publishedAt": "pub_date"},
                    inplace=True
                    )
    return final_df

In [150]:
def make_score(string: str) -> float:
    """
    Predicts sentiment for a string. returns a float between -1 and 1.
    """
    sia = SentimentIntensityAnalyzer()
    if string is None:
        return None
    else:
        return sia.polarity_scores(string)['compound']


def predict_sentiment(df: pd.DataFrame):
    """
    Makes scores for each title and description and aggregates the score for each company for each pub date.
    Also adds date of modification as "fetch_date".
    """
    df['score_description'] = df['description'].apply(make_score)
    df['score_title'] = df['title'].apply(make_score)

In [176]:
df = get_raw_news_from_big_query()
clean_df = clean_news(df)



In [149]:
nltk.download("vader_lexicon")

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /Users/tomasrydenstam/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

In [177]:
predict_sentiment(clean_df)

In [178]:
clean_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 786 entries, 0 to 785
Data columns (total 9 columns):
 #   Column             Non-Null Count  Dtype              
---  ------             --------------  -----              
 0   author             776 non-null    object             
 1   description        780 non-null    object             
 2   pub_date           786 non-null    datetime64[ns, UTC]
 3   title              786 non-null    object             
 4   url                786 non-null    object             
 5   source_name        786 non-null    object             
 6   company            786 non-null    object             
 7   score_description  780 non-null    float64            
 8   score_title        786 non-null    float64            
dtypes: datetime64[ns, UTC](1), float64(2), object(6)
memory usage: 55.4+ KB


In [179]:
from google.cloud import bigquery

# Initiera BigQuery-klienten
client = bigquery.Client.from_service_account_json(
    'tomastestproject-433206-adc5bc090976.json'
)

# Definiera ditt dataset och tabellnamn
table = 'clean_news'
table_id = f"tomastestproject-433206.testdb_1.{table}"

# Definiera schema med uppdaterat kolumnnamn
schema = [
    bigquery.SchemaField("author", "STRING"),
    bigquery.SchemaField("description", "STRING"),
    bigquery.SchemaField("pub_date", "TIMESTAMP"),
    bigquery.SchemaField("title", "STRING"),
    bigquery.SchemaField("url", "STRING"),
    bigquery.SchemaField("source_name", "STRING"),  # Uppdaterat kolumnnamn
    bigquery.SchemaField("company", "STRING"),
    bigquery.SchemaField("score_description", "FLOAT"),
    bigquery.SchemaField("score_title", "FLOAT"),
]

# Skapa en Tabellreferens
table = bigquery.Table(table_id, schema=schema)

# Skapa Tabell
table = client.create_table(table)  # Här skapas tabellen med table-objektet
print(f"Created table {table_id}")

Created table tomastestproject-433206.testdb_1.clean_news


In [180]:


def write_clean_news_to_bq(data: pd.DataFrame, table='clean_news', project_id='tomastestproject-433206', dataset='testdb_1'):
    # Initiera BigQuery-klienten
    client = bigquery.Client.from_service_account_json(
        'tomastestproject-433206-adc5bc090976.json'
    )

    # Definiera fullständigt tabell-id
    table_id = f"{project_id}.{dataset}.{table}"

    # Ladda DataFrame till BigQuery
    job = client.load_table_from_dataframe(data, table_id)

    # Vänta tills jobbet är klart
    job.result()

    # Kontrollera om det blev fel vid insättning av rader
    if job.errors:
        print(f"Errors: {job.errors}")
    else:
        print("DataFrame har sparats till BigQuery utan fel.")

In [124]:
import pandas as pd


def get_raw_news_from_big_query(table='raw_news', project_id='tomastestproject-433206', dataset='testdb_1'):
    """
    Fetches unprocessed raw news data from a specified BigQuery table and returns it as a pandas DataFrame 
    along with a string of row IDs that were used.

    This function connects to Google BigQuery using credentials from a service account, constructs and executes 
    an SQL query to select rows where `is_processed` is FALSE from the specified table within the provided 
    dataset and project. The results are returned as a pandas DataFrame. It also extracts the `unique_id` from 
    the DataFrame and compiles them into a comma-separated string format for further processing.

    Args:
        table (str): The name of the table in BigQuery to fetch data from. Default is 'raw_news'.
        project_id (str): The Google Cloud project ID where the BigQuery dataset is located. Default is 'tomastestproject-433206'.
        dataset (str): The BigQuery dataset name that contains the table. Default is 'testdb_1'.

    Returns:
        pd.DataFrame: A DataFrame containing the unprocessed raw news data.
        str: A string of `unique_id`s from the fetched rows, formatted as comma-separated values in single quotes.

    Raises:
        ValueError: If no unprocessed data is found in the table.
    """
    # secret_data = get_secret()

    # # Ladda JSON-strängen till en dictionary
    # service_account_info = json.loads(secret_data)

    # # Initiera BigQuery-klienten med service account
    # client = bigquery.Client.from_service_account_info(service_account_info)
    client = bigquery.Client.from_service_account_json(
        'tomastestproject-433206-adc5bc090976.json'
    )

    table_id = f"{project_id}.{dataset}.{table}"
    # unique_id, data,company
    # Build your SQL query
    query = f"""
        SELECT *
        FROM `{table_id}`
        WHERE is_processed IS FALSE
        """

    # Execute the SQL query
    query_job = client.query(query)

    # Fetch the results
    results = query_job.result()

    # Convert results to a DataFrame
    df = results.to_dataframe()

    # Check if DataFrame is empty and raise an error if needed
    if df.empty:
        return None, None

    # Create a comma-separated string of unique IDs
    processed_id_list = df["unique_id"].to_list()
    id_str = ', '.join(f"'{id}'" for id in processed_id_list)

    return df, id_str


def update_is_processed(id_string: str, table='raw_news', project_id='tomastestproject-433206', dataset='testdb_1'):

    table_id = f"{project_id}.{dataset}.{table}"
    # secret_data = get_secret()

    # # Ladda JSON-strängen till en dictionary
    # service_account_info = json.loads(secret_data)

    # # Initiera BigQuery-klienten med service account
    # client = bigquery.Client.from_service_account_info(
    #     service_account_info)

    client = bigquery.Client.from_service_account_json(
        'tomastestproject-433206-adc5bc090976.json'
    )

    # Konstruera SQL-frågan
    query = f"""
    UPDATE `{table_id}`
    SET is_processed = TRUE
    WHERE unique_id IN ({id_string});
    """

    # Kör frågan
    job = client.query(query)
    job.result()  # Vänta på att jobbet ska slutföras


def clean_news(df: pd.DataFrame) -> pd.DataFrame:
    """
        Cleans and transforms raw news data extracted from BigQuery into a structured DataFrame format.

    This function takes a DataFrame containing raw news data, unpacks JSON-like structures to separate rows 
    for each news article, and normalizes the data into a flat table format. The resulting DataFrame will have 
    one row per article with relevant information such as author, description, publication date, title, URL, 
    source, company, and sentiment scores.

    """
    # Förbered DataFrame
    # Se till att 'data' kolumnen är en lista av artiklar
    df.drop(columns=['unique_id'], inplace=True)

    df['data'] = df['data'].apply(lambda x: x.get(
        'articles', []) if isinstance(x, dict) else [])

    # Explodera artiklar till separata rader
    df_exploded = df.explode('data')

    # Normalisera JSON-data i 'data' kolumnen
    articles_df = pd.json_normalize(df_exploded['data'])

    # Lägg till övriga kolumner
    # Kombinera normaliserad artikeldata med 'company' kolumnen
    final_df = pd.concat(
        [articles_df, df_exploded[['company']].reset_index(drop=True)], axis=1)

    # final_df.drop(columns=['content', 'source.id', 'urlToImage'], inplace=True)

    # final_df['publishedAt'] = pd.to_datetime(
    #     final_df['publishedAt'], format='%Y-%m-%dT%H:%M:%SZ', utc=True)

    # final_df.rename(columns={"source.name": "source_name",
    #                          "publishedAt": "pub_date"},
    #                 inplace=True
    #                 )
    return final_df


def make_sentiment_score(string: str) -> float:
    """
    Predicts sentiment for a string. returns a float between -1 and 1.
    """
    sia = SentimentIntensityAnalyzer()
    if string is None:
        return None
    else:
        return sia.polarity_scores(string)['compound']


def predict_sentiment(df: pd.DataFrame):
    """
    Makes scores for each title and description and adds it as "score_description" and "score_title" to the Dataframe.
    """
    df['score_description'] = df['description'].apply(make_sentiment_score)
    df['score_title'] = df['title'].apply(make_sentiment_score)


def write_clean_news_to_bq(data: pd.DataFrame, table='clean_news', project_id='tomastestproject-433206', dataset='testdb_1'):
    """
    Writes cleaned data to Big Query
    """
    # # Initiera BigQuery-klienten
    # secret_data = get_secret()

    # # Ladda JSON-strängen till en dictionary
    # service_account_info = json.loads(secret_data)

    # # Initiera BigQuery-klienten med service account
    # client = bigquery.Client.from_service_account_info(
    #     service_account_info)
    client = bigquery.Client.from_service_account_json(
        'tomastestproject-433206-adc5bc090976.json'
    )

    # Definiera fullständigt tabell-id
    table_id = f"{project_id}.{dataset}.{table}"

    # Ladda DataFrame till BigQuery
    job = client.load_table_from_dataframe(data, table_id)

    # Vänta tills jobbet är klart
    job.result()

    # Kontrollera om det blev fel vid insättning av rader
    if job.errors:
        print(f"Errors: {job.errors}")
    else:
        return f'{job.output_rows} rader sparades till {table}'

In [136]:

df['data'] = df['data'].apply(lambda x: x.get(
    'articles', []) if isinstance(x, dict) else [])

# Explodera artiklar till separata rader
df_exploded = df.explode('data')

# Normalisera JSON-data i 'data' kolumnen
articles_df = pd.json_normalize(df_exploded['data'])

# Lägg till övriga kolumner
# Kombinera normaliserad artikeldata med 'company' kolumnen
final_df = pd.concat(
    [articles_df, df_exploded[['company']].reset_index(drop=True)], axis=1)

# final_df.drop(columns=['content', 'source.id', 'urlToImage'], inplace=True)

# final_df['publishedAt'] = pd.to_datetime(
#     final_df['publishedAt'], format='%Y-%m-%dT%H:%M:%SZ', utc=True)

# final_df.rename(columns={"source.name": "source_name",
#                             "publishedAt": "pub_date"},
#                 inplace=True
#                 )

In [151]:
df, ids = get_raw_news_from_big_query(
    table='raw_news_with_uuid', project_id='tomastestproject-433206', dataset='testdb_1')

I0000 00:00:1725018441.377165 7426783 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported


In [152]:
df

Unnamed: 0,unique_id,company,fetch_date,data,is_processed
0,476e0453-2746-4128-afef-89cb2f6c162b,AAPL,2024-08-29 09:15:16+00:00,"{""articles"":[{""author"":""applech2"",""content"":""A...",False
1,2260b501-7b08-4807-94a5-dfedce28dde4,AAPL,2024-08-29 09:15:16+00:00,"{""articles"":[{""author"":""applech2"",""content"":""A...",False
2,981f711c-dacc-4126-bc9e-4b9678ea9f76,AAPL,2024-08-29 10:55:40+00:00,"{""articles"":[{""author"":""applech2"",""content"":""A...",False
3,1954ea9f-280b-4433-8707-75cfa8659c73,AAPL,2024-08-29 10:47:56+00:00,"{""articles"":[{""author"":""applech2"",""content"":""A...",False
4,058ab43e-c98f-4923-9635-32d03682a194,AAPL,2024-08-29 10:55:40+00:00,"{""articles"":[{""author"":""applech2"",""content"":""A...",False
...,...,...,...,...,...
99,bc448599-4b7d-4a88-ac3e-158bbaf0dcaf,TSLA,2024-08-28 09:56:29+00:00,"{""articles"":[{""author"":""William Gavin"",""conten...",False
100,ad4294b9-9cf1-45d0-9bea-84abf081544a,TSLA,2024-08-27 20:47:09+00:00,"{""articles"":[{""author"":""Ana Altchek"",""content""...",False
101,dfb0f8bd-cc24-4250-aa84-f45832300251,TSLA,2024-08-27 20:47:09+00:00,"{""articles"":[{""author"":""Ana Altchek"",""content""...",False
102,6af3087a-f252-46f0-9d29-4543bf81c804,TSLA,2024-08-27 16:06:31+00:00,"{""articles"":[{""author"":""Bradley Brownell"",""con...",False


In [150]:
import pandas as pd
import json


def clean_news_3(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans and transforms raw news data extracted from BigQuery into a structured DataFrame format.

    This function takes a DataFrame containing raw news data, unpacks JSON-like structures to separate rows 
    for each news article, and normalizes the data into a flat table format. The resulting DataFrame will have 
    one row per article with relevant information such as author, description, publication date, title, URL, 
    source, company, and sentiment scores.

    Args:
        df (pd.DataFrame): DataFrame containing raw news data with columns `unique_id`, `company`, `fetch_date`, `data`, and `is_processed`.

    Returns:
        pd.DataFrame: A DataFrame where each row represents a single news article with additional columns from the `company` column.
    """
    def extract_articles(json_obj):
        if isinstance(json_obj, dict) and 'articles' in json_obj:
            return json_obj['articles']
        return []

    # Förbered DataFrame
    df['data'] = df['data'].apply(
        lambda x: json.loads(x) if isinstance(x, str) else {})
    df['data'] = df['data'].apply(extract_articles)

    # Explodera artiklar till separata rader
    df_exploded = df.explode('data')

    # Normalisera JSON-data i 'data' kolumnen
    articles_df = pd.json_normalize(df_exploded['data'])

    # Lägg till övriga kolumner
    final_df = pd.concat(
        [articles_df, df_exploded[['company']].reset_index(drop=True)], axis=1)

    # Droppa eventuellt onödiga kolumner
    final_df.drop(columns=['content', 'source.id',
                  'urlToImage'], inplace=True, errors='ignore')

    # Omvandla 'publishedAt' till datetime format
    final_df['publishedAt'] = pd.to_datetime(
        final_df['publishedAt'], format='%Y-%m-%dT%H:%M:%SZ', utc=True, errors='coerce')

    # Omvandla kolumnnamn för läsbarhet
    final_df.rename(columns={"source.name": "source_name",
                    "publishedAt": "pub_date"}, inplace=True)

    return final_df

In [153]:
dff = clean_news_3(df)

In [162]:
df, ids = get_raw_news_from_big_query(
    table='raw_news_data', project_id='tomastestproject-433206', dataset='testdb_1')

I0000 00:00:1725027015.377527 7426783 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported


In [166]:
update_is_processed(id_string=ids, table='raw_news_data')

In [155]:
df, ids = get_raw_news_from_big_query(
    table='raw_news_with_uuid', project_id='tomastestproject-433206', dataset='testdb_1')

# Rensa nyhetsdata
dff = clean_news_3(df)

# Gör sentimentanalyser
predict_sentiment(df=dff)

# Skriv de rensade nyheterna till BigQuery och få antalet rader som skrevs
rows_written = write_clean_news_to_bq(data=dff, table='clean_news_copy')

update_is_processed(id_string=ids, table='raw_news_with_uuid')

### FORTSÄTT HÄR ###

In [167]:
from google.cloud import bigquery
import pandas as pd  # Importera pandas för att hantera DataFrames

# Initialisera BigQuery-klienten
client = bigquery.Client.from_service_account_json(
    'tomastestproject-433206-adc5bc090976.json')

# SQL-fråga för att hämta data från tabellen
query = """ 
SELECT unique_id, data 
FROM `tomastestproject-433206.testdb_1.raw_news_data`
WHERE is_processed IS FALSE
"""
# query_2="""
# UPDATE `tomastestproject-433206.testdb_1.raw_news_with_uuid` SET is_processed = TRUE WHERE is_processed IS null


# """
# Kör frågan
job = client.query(query)

# Vänta på att jobbet ska slutföras och hämta resultaten som en DataFrame
# df = job.to_dataframe()

# Visa de första raderna i DataFrame
# print(df.head())

In [177]:
job.result().total_rows

0

In [15]:
processed_id_list = df["unique_id"].to_list()
id_str = ', '.join(f"'{id}'" for id in processed_id_list)

In [17]:
# Konvertera listan till en SQL-kompatibel sträng
id_str = ', '.join(f"'{id}'" for id in processed_id_list)

# Konstruera SQL-frågan
query = f"""
UPDATE `tomastestproject-433206.testdb_1.raw_news_with_uuid`
SET is_processed = TRUE
WHERE unique_id IN ({id_str});
"""

# Kör frågan
job = client.query(query)
job.result()  # Vänta på att jobbet ska slutföras

print("Tabellen har uppdaterats med is_processed = TRUE för angivna ID:n.")

Tabellen har uppdaterats med is_processed = TRUE för angivna ID:n.


In [10]:
def get_raw_news_from_big_query(table='raw_news', project_id='tomastestproject-433206', dataset='testdb_1'):
    """
    Fetches unprocessed raw news data from a specified BigQuery table and returns it as a pandas DataFrame 
    along with a string of row IDs that were used.

    This function connects to Google BigQuery using credentials from a service account, constructs and executes 
    an SQL query to select rows where `is_processed` is FALSE from the specified table within the provided 
    dataset and project. The results are returned as a pandas DataFrame. It also extracts the `unique_id` from 
    the DataFrame and compiles them into a comma-separated string format for further processing.

    Args:
        table (str): The name of the table in BigQuery to fetch data from. Default is 'raw_news'.
        project_id (str): The Google Cloud project ID where the BigQuery dataset is located. Default is 'tomastestproject-433206'.
        dataset (str): The BigQuery dataset name that contains the table. Default is 'testdb_1'.

    Returns:
        pd.DataFrame: A DataFrame containing the unprocessed raw news data.
        str: A string of `unique_id`s from the fetched rows, formatted as comma-separated values in single quotes.

    Raises:
        ValueError: If no unprocessed data is found in the table.
    """

    # Initiera BigQuery-klienten med service account
    client = bigquery.Client.from_service_account_json(
        '/Users/tomasrydenstam/Desktop/Skola/test_project/transform_news_2/tomastestproject-433206-adc5bc090976.json')

    table_id = f"{project_id}.{dataset}.{table}"

    # Build your SQL query
    query = f"""
        SELECT unique_id, data
        FROM `{table_id}`
        WHERE is_processed IS FALSE
        """

    # Execute the SQL query
    query_job = client.query(query)

    # Fetch the results
    results = query_job.result()

    # Convert results to a DataFrame
    df = results.to_dataframe()

    # Check if DataFrame is empty and raise an error if needed
    if df.empty:
        raise ValueError("No unprocessed data found")

    # Create a comma-separated string of unique IDs
    processed_id_list = df["unique_id"].to_list()
    if processed_id_list:
        id_str = ', '.join(f"'{id}'" for id in processed_id_list)

    return df, id_str

In [7]:
def update_is_processed(id_string: str, table='raw_news', project_id='tomastestproject-433206', dataset='testdb_1'):

    table_id = f"{project_id}.{dataset}.{table}"
    client = bigquery.Client.from_service_account_json(
        '/Users/tomasrydenstam/Desktop/Skola/test_project/transform_news_2/tomastestproject-433206-adc5bc090976.json')

    # Konstruera SQL-frågan
    query = f"""
    UPDATE `{table_id}`
    SET is_processed = TRUE
    WHERE unique_id IN ({id_string});
    """

    # Kör frågan
    job = client.query(query)
    job.result()  # Vänta på att jobbet ska slutföras

In [12]:
df, stttrr = get_raw_news_from_big_query(table='raw_news_with_uuid')



In [6]:
stttrr

"'476e0453-2746-4128-afef-89cb2f6c162b', '2260b501-7b08-4807-94a5-dfedce28dde4', 'b0138e6e-e669-4624-8254-f9351405b773', '981f711c-dacc-4126-bc9e-4b9678ea9f76', '1954ea9f-280b-4433-8707-75cfa8659c73', '058ab43e-c98f-4923-9635-32d03682a194', 'e3147755-c6d1-4b1f-b74e-b962b37dcdfc', '6036c725-5fea-4e50-9ce3-f743446be23d', 'd3059d6a-1d57-4315-9f74-13968e13b3fa', '887e0e21-1fdd-44ab-a64d-6f1bbd78fa01', 'c5564d54-7a41-466b-a428-0df6dd00fde4', '2e3d4551-0fe0-4703-96a7-58d0d0000bd6', '1bea4b39-27d0-4616-91da-8b3ce929bd4f', '8a8dd974-b83f-4714-b40e-0b562d410645', '33bb229d-f0f9-4854-a313-277334e1bd30', '8a3f901a-ad64-4e0d-8a0f-9171289f636f', '33449269-5bb4-4c8e-9200-92a392c5e24b', '53849022-af6b-4b6e-b5b8-9e764ec32d5d', 'ea1669f8-8a08-4803-a4fb-3730062189ad', '13bfd93f-bbed-43c4-8d8b-9436cb8b004a', '5008cc35-eb61-4349-9a78-f47bb6e60007', '5f393783-961e-46a2-a91d-8ff954e7967e', '6a6264e0-c982-4ee4-aab2-1fa61cc20d3e', '602f0d69-93b2-425c-a835-1953ce87a209', '356d69da-4484-4fff-8053-edb37d288f27',

In [13]:
update_is_processed(id_string=stttrr, table='raw_news_with_uuid')

In [185]:
def transfer_ids_to_meta_data(table_from='raw_news_data', table_to='raw_news_meta_data', project_id='tomastestproject-433206', dataset='testdb_1', secret='bigquery-accout-secret'):
    try:
        # Initiera BigQuery-klienten

        client = bigquery.Client.from_service_account_json(
            '/Users/tomasrydenstam/Desktop/Skola/test_project/transform_news_2/tomastestproject-433206-adc5bc090976.json')

        # Definiera din dataset och tabell
        meta_data_table = f"{project_id}.{dataset}.{table_to}"
        raw_data_table = f"{project_id}.{dataset}.{table_from}"
        query = f"""
                    INSERT INTO `{meta_data_table}` (unique_id, is_processed)
                    SELECT unique_id, FALSE
                    FROM `{raw_data_table}`
                    WHERE unique_id NOT IN (
                    SELECT unique_id
                    FROM `{meta_data_table}`)
                """

        errors = client.query(query)

    except NotFound:
        print(f"Error: The table {meta_data_table} was not found.")
        raise

    except GoogleAPIError as e:
        print(f"Google API Error: {e}")
        raise

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        raise

In [186]:
asd = transfer_ids_to_meta_data()

In [188]:
asd.to_dataframe()

I0000 00:00:1725042354.889144 7426783 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported


Unnamed: 0,unique_id
0,2260b501-7b08-4807-94a5-dfedce28dde4
1,476e0453-2746-4128-afef-89cb2f6c162b
2,058ab43e-c98f-4923-9635-32d03682a194
3,981f711c-dacc-4126-bc9e-4b9678ea9f76
4,1954ea9f-280b-4433-8707-75cfa8659c73
...,...
114,7905cefb-5ac4-4c17-9153-8f87359cf245
115,6de10562-9341-4ddb-b6c8-2bb86eec7f31
116,02d439c5-4356-4b36-a8e7-e615f20c7b1f
117,4aab533c-842e-41a0-a222-ee4d0c3cea52


In [192]:
def get_raw_news_from_big_query(raw_data_table='raw_news_data', meta_data_table='raw_news_meta_data', project_id='tomastestproject-433206', dataset='testdb_1'):
    """
    Fetches unprocessed raw news data from a specified BigQuery table and returns it as a pandas DataFrame 
    along with a string of row IDs that were used.

    This function connects to Google BigQuery using credentials from a service account, constructs and executes 
    an SQL query to select rows where `is_processed` is FALSE from the specified table within the provided 
    dataset and project. The results are returned as a pandas DataFrame. It also extracts the `unique_id` from 
    the DataFrame and compiles them into a comma-separated string format for further processing.

    Args:
        table (str): The name of the table in BigQuery to fetch data from. Default is 'raw_news'.
        project_id (str): The Google Cloud project ID where the BigQuery dataset is located. Default is 'tomastestproject-433206'.
        dataset (str): The BigQuery dataset name that contains the table. Default is 'testdb_1'.

    Returns:
        pd.DataFrame: A DataFrame containing the unprocessed raw news data.
        str: A string of `unique_id`s from the fetched rows, formatted as comma-separated values in single quotes.

    Raises:
        ValueError: If no unprocessed data is found in the table.xw
    """

    client = bigquery.Client.from_service_account_json(
        '/Users/tomasrydenstam/Desktop/Skola/test_project/transform_news_2/tomastestproject-433206-adc5bc090976.json')

    raw_data_table_id = f"{project_id}.{dataset}.{raw_data_table}"
    meta_data_table_id = f"{project_id}.{dataset}.{meta_data_table}"

    # Build your SQL query
    query = f"""
        SELECT unique_id, data,company
        FROM `{raw_data_table_id}`
        WHERE unique_id IN (SELECT unique_id FROM `{meta_data_table_id}` WHERE is_processed IS FALSE)
        """

    # Execute the SQL query
    query_job = client.query(query)

    # Fetch the results
    results = query_job.result()

    # Convert results to a DataFrame
    df = results.to_dataframe()
    return df

In [193]:
test_df = get_raw_news_from_big_query()

I0000 00:00:1725080798.727240 7426783 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported


In [194]:
test_df

Unnamed: 0,unique_id,data,company
0,2260b501-7b08-4807-94a5-dfedce28dde4,"{""articles"":[{""author"":""applech2"",""content"":""A...",AAPL
1,476e0453-2746-4128-afef-89cb2f6c162b,"{""articles"":[{""author"":""applech2"",""content"":""A...",AAPL
2,058ab43e-c98f-4923-9635-32d03682a194,"{""articles"":[{""author"":""applech2"",""content"":""A...",AAPL
3,981f711c-dacc-4126-bc9e-4b9678ea9f76,"{""articles"":[{""author"":""applech2"",""content"":""A...",AAPL
4,1954ea9f-280b-4433-8707-75cfa8659c73,"{""articles"":[{""author"":""applech2"",""content"":""A...",AAPL
...,...,...,...
119,b83338ba-820d-490d-8255-c8101e820f2f,"{""articles"":[{""author"":""Rocio Fabbro"",""content...",AAPL
120,a8f4664c-d4f5-41c8-ad66-1a0842cefc86,"{""articles"":[{""author"":""Rocio Fabbro"",""content...",GOOGL
121,cf6d1eca-e44a-46a2-915c-f3d9fbde60f4,"{""articles"":[{""author"":""William Gavin"",""conten...",MSFT
122,073b2365-cb48-423b-8148-bb1477d64432,"{""articles"":[{""author"":""Asra Isar"",""content"":""...",AMZN
