In [2]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
from transformers import DistilBertTokenizer
from transformers import TextClassificationPipeline
from transformers import TFDistilBertForSequenceClassification, TFTrainer, TFTrainingArguments
import tensorflow as tf
from sklearn.model_selection import train_test_split
import datetime as dtm
import time
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pymongo
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

  from .autonotebook import tqdm as notebook_tqdm




In [3]:
class WebScraper:

    def __init__(self, url, path):
        self.url = url
        self.path = path

    @staticmethod
    def remove_accents(text):
        text = text.lower()
        text = text.replace('á', 'a')
        text = text.replace('é', 'e')
        text = text.replace('í', 'i')
        text = text.replace('ó', 'o')
        text = text.replace('ú', 'u')
        translator = str.maketrans('', '', '¿?¡!(),.;:«»"“”…–-—’‘’')
        text = text.translate(translator)
        return text

    def scrape(self):
        pedido_obtenido = requests.get(self.url)
        pedido_obtenido.encoding = 'utf-8'
        html_obtenido = pedido_obtenido.text
        soup = BeautifulSoup(html_obtenido, 'html.parser')
        news = []
        h3_todos = soup.find_all('h3')
        for h3 in h3_todos:
            texto = h3.text.strip()
            texto = self.remove_accents(texto)
            news.append(texto)
        return news

    def create_dataframe(self):
        date =  dtm.datetime.now().strftime("%Y-%m-%d")
        news = self.scrape()
        df = pd.DataFrame(np.array(news), columns=['news'])
        df.to_excel(self.path + 'news_' + str(date) + '.xlsx')
        return "Archivo creado"

    def main(self):
        news = self.scrape()
        result = self.create_dataframe()
        return result

In [None]:
def generate_datanews(url):
    scraper = WebScraper(url, 'data/')
    result = scraper.main()
    print(result)

if __name__ == '__main__':
    #url = 'https://www.elcolombiano.com'
    generate_datanews(url)

In [41]:
class TextClassification:

    def __init__(self):
        self.model = None
        self.tokenizer = None

    def data_reading(self):
        df = pd.read_excel('data/news.xlsx', sheet_name='Sheet1')
        df['encoded_tag'] = df['tag'].map({'negativa': 0, 'neutra': 1, 'positiva': 2})
        data_texts = df['news'].to_list()
        data_labels = df['encoded_tag'].to_list()
        return data_texts, data_labels

    def train_test_split(self, data_texts, data_labels):
        train_texts, val_texts, train_labels, val_labels = train_test_split(data_texts, data_labels, test_size=0.2, random_state=0)
        train_texts, test_texts, train_labels, test_labels = train_test_split(train_texts, train_labels, test_size=0.01, random_state=0)
        return train_texts, val_texts, test_texts, train_labels, val_labels, test_labels

    def tokenization(self, train_texts, val_texts, train_labels, val_labels):
        self.tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
        train_encodings = self.tokenizer(train_texts, truncation=True, padding=True)
        val_encodings = self.tokenizer(val_texts, truncation=True, padding=True)
        train_dataset = tf.data.Dataset.from_tensor_slices((
            dict(train_encodings),
            train_labels
        ))

        val_dataset = tf.data.Dataset.from_tensor_slices((
            dict(val_encodings),
            val_labels
        ))
        return train_dataset, val_dataset

    def modeling(self, train_dataset, val_dataset):
        self.model = TFDistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=3)
        training_args = TFTrainingArguments(
            output_dir='./results',
            num_train_epochs=7,
            per_device_train_batch_size=16,
            per_device_eval_batch_size=64,
            warmup_steps=500,
            weight_decay=1e-5,
            logging_dir='./logs',
            eval_steps=100
        )

        with training_args.strategy.scope():
            trainer_model = TFDistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=3)

        trainer = TFTrainer(
            model=trainer_model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
        )
        trainer.train()
        accuracy = trainer.evaluate()
        save_directory = "/saved_models"
        self.model.save_pretrained(save_directory)
        self.tokenizer.save_pretrained(save_directory)
        return accuracy, save_directory

if __name__ == '__main__':
    text_classifier = TextClassification()
    data_texts, data_labels = text_classifier.data_reading()
    train_texts, val_texts, test_texts, train_labels, val_labels, test_labels = text_classifier.train_test_split(data_texts, data_labels)
    train_dataset, val_dataset = text_classifier.tokenization(train_texts, val_texts, train_labels, val_labels)
    accuracy, save_dir = text_classifier.modeling(train_dataset, val_dataset)
    print('Accuracy: ', accuracy)

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFDistilBertForSequenceClassification: ['vocab_projector.bias', 'vocab_transform.bias', 'vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_transform.weight']
- This IS expected if you are initializing TFDistilBertForSequenceClassification from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertForSequenceClassification from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
Some weights or buffers of the TF 2.0 model TFDistilBertForSequenceClassification were not initialized from the PyTorch model and are newly initialized: ['pre_classifier.weight', 'pre_classifier.bias', 'classifier.weight', 'classifier.bias']
You should 

Accuracy:  {'eval_loss': 1.0505685806274414}


In [5]:
class ModelLoader:

    def __init__(self):
        self.tokenizer = DistilBertTokenizer.from_pretrained("/saved_models", verbose=False)
        self.model = TFDistilBertForSequenceClassification.from_pretrained("/saved_models")

    def load_model(self, test_texts):
        predict_input = self.tokenizer.encode(
            test_texts,
            truncation=True,
            padding=True,
            return_tensors='tf'
        )
        output = self.model(predict_input)[0]
        prediction_value = tf.argmax(output, axis=1).numpy()[0]
        return prediction_value, test_texts

if __name__ == '__main__':
    df = pd.read_excel('data/news.xlsx', sheet_name='Sheet1')
    data_texts = df['news'].to_list()[7]
    model_loader = ModelLoader()
    prediction_value, test_texts = model_loader.load_model(data_texts)
    print(prediction_value, test_texts)

Some layers from the model checkpoint at /saved_models were not used when initializing TFDistilBertForSequenceClassification: ['dropout_279']
- This IS expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some layers of TFDistilBertForSequenceClassification were not initialized from the model checkpoint at /saved_models and are newly initialized: ['dropout_19']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


2 la reina del tiro con arco la colombiana sara lopez se corono campeona del mundial de tiro con arco


In [40]:
def test_model():
    df = pd.read_excel('data/news.xlsx', sheet_name='Sheet1')
    data_texts = df['news'].to_list()[7]
    model_loader = ModelLoader()
    prediction_value, text = model_loader.load_model(data_texts)
    print('New: ', text)
    print('Prediction: ', prediction_value)

if __name__ == '__main__':
    test_model()

Some layers from the model checkpoint at /saved_models were not used when initializing TFDistilBertForSequenceClassification: ['dropout_199']
- This IS expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some layers of TFDistilBertForSequenceClassification were not initialized from the model checkpoint at /saved_models and are newly initialized: ['dropout_259']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


New:  la reina del tiro con arco la colombiana sara lopez se corono campeona del mundial de tiro con arco
Prediction:  1


In [6]:
class NewsPredictor:
    def __init__(self):
        pass

    def predict_and_save_to_excel(self):
        prediction_list = []
        date = dtm.datetime.now().strftime("%Y-%m-%d")
        df = pd.read_excel(f'news/news_{date}.xlsx', sheet_name='Sheet1')
        model_loader = ModelLoader()
        for text in df['news'].to_list():
            prediction_value, text = model_loader.load_model(text)
            prediction_list.append(prediction_value)
        df['prediction'] = prediction_list
        df.to_excel(f'news/news_{date}.xlsx')

if __name__ == '__main__':
    predictor = NewsPredictor()
    predictor.predict_and_save_to_excel()


Some layers from the model checkpoint at /saved_models were not used when initializing TFDistilBertForSequenceClassification: ['dropout_279']
- This IS expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some layers of TFDistilBertForSequenceClassification were not initialized from the model checkpoint at /saved_models and are newly initialized: ['dropout_39']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [8]:
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['News_colombiano']
collection = db.prediction_collection

class DataBase:
    def __init__(self):
        pass
    
    def savedata(self):
        date = dtm.datetime.now().strftime("%Y-%m-%d")
        news = pd.read_excel(f'news/news_{date}.xlsx', sheet_name='Sheet1')
        news = news.to_dict('records')
        collection.insert_many(news)
        return "Data saved"

if __name__ == '__main__':
    database = DataBase()
    database.savedata()

In [None]:
class DataReporter:
    def __init__(self, news, email, password):
        self.news = news
        self.email = email
        self.password = password

    def report(self):
        negative_news = self.news[self.news['prediction'] == 0]
        positive_news = self.news[self.news['prediction'] == 2]
        negative_news = negative_news['news'].to_list()
        positive_news = positive_news['news'].to_list()
        negative_news = '\n'.join(negative_news)
        positive_news = '\n'.join(positive_news)
        return {
            'negative_news': negative_news,
            'positive_news': positive_news
        }

    def send_email(self):
        date = dtm.datetime.now().strftime("%Y-%m-%d")
        df = pd.read_excel(f'news/news_{date}.xlsx', sheet_name='Sheet1')
        reporte = self.report()
        smtp_server = 'smtp.gmail.com'
        smtp_port = 587
        smtp_username = self.email
        smtp_password = self.password

        msg = MIMEMultipart()
        msg['From'] = smtp_username
        msg['To'] = to_email
        msg['Subject'] = 'Informe de noticias'

        body = f'''
        Noticias Negativas:
        {reporte['negative_news']}

        Noticias Positivas:
        {reporte['positive_news']}
        '''
        msg.attach(MIMEText(body, 'plain'))

        try:
            server = smtplib.SMTP(smtp_server, smtp_port)
            server.starttls()
            server.login(smtp_username, smtp_password)
            server.sendmail(smtp_username, to_email, msg.as_string())
            server.quit()
            print("Correo enviado con éxito")
        except Exception as e:
            print(f"Error al enviar el correo: {str(e)}")

if __name__ == '__main__':
    date = dtm.datetime.now().strftime("%Y-%m-%d")
    path = '../news/' + 'news_' + str(date) + '.xlsx'
    to_email = 'sergio.quintero.1804@gmail.com'
    password = 'password_dummy'
    visualizer = DataReporter(path, to_email, password)
    visualizer.send_email(to_email)

In [24]:
default_args = {
    'owner': 'tu_nombre',
    'start_date': datetime(2023, 9, 12, 6, 0, 0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'web_scraping_and_prediction',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    catchup=False,
)

def run_web_scraper():
    WebScraper()

web_scraping_task = PythonOperator(
    task_id='web_scraping_task',
    python_callable=run_web_scraper,
    dag=dag,
)

def run_prediction():
    NewsPredictor()

prediction_task = PythonOperator(
    task_id='prediction_task',
    python_callable=run_prediction,
    dag=dag,
)

def run_database():
    DataBase()

database_task = PythonOperator(
    task_id='database_task',
    python_callable=run_database,
    dag=dag,
)

def run_report():
    DataReporter()

report_task = PythonOperator(
    task_id='report_task',
    python_callable=run_report,
    dag=dag,
)

web_scraping_task >> prediction_task  >> database_task >> report_task

if __name__ == "__main__":
    dag.cli()