In [1]:
import requests
from bs4 import BeautifulSoup
from  settings import BASE_URL, USER_AGENT, AUTHORS,QUESTION_CONTEXT, DS_API_KEY
import pandas as pd
import re
from datetime import datetime, timedelta
import json
from openai import OpenAI

In [5]:
s = "Напиши два ответа на основании предложенного текста. Первый ответ - название компании, о которой идет речь в тексте. Второй ответ - направление цены, напиши \"up\" или \"down\". Ответ должен выглядить так: \"Название тикета компании на биржи. Число\""

In [6]:
print(s)

Напиши два ответа на основании предложенного текста. Первый ответ - название компании, о которой идет речь в тексте. Второй ответ - направление цены, напиши "up" или "down". Ответ должен выглядить так: "Название тикета компании на биржи. Число"


In [2]:
def get_base_url(name, start= BASE_URL):
    return start+name

In [3]:
def get_question_context(file_path = QUESTION_CONTEXT):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            file_content = file.read()
            return file_content
    except FileNotFoundError:
        print(f"The file {file_path} does not exist.")
    except Exception as e:
        print(f"An error occurred: {e}")

In [11]:
class Article:
    month_dict = {
    'января': '01',
    'февраля': '02',
    'марта': '03',
    'апреля': '04',
    'мая': '05',
    'июня': '06',
    'июля': '07',
    'августа': '08',
    'сентября': '09',
    'октября': '10',
    'ноября': '11',
    'декабря': '12'
    }
    
    def __init__(self, date, text, from_json=False):#, likes):
        if from_json:
            self.date = date
            self.text = text
        else:            
            self.date = self.fix_dates(date)
            self.text = self.clean_text(text)
            self.ticker, self.prediction = 0, 0

    def clean_text(self, text):
        emoji_pattern = re.compile("["
                                u"\U0001F600-\U0001F64F"  # emoticons
                                u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                                u"\U0001F680-\U0001F6FF"  # transport & map symbols
                                u"\U0001F700-\U0001F77F"  # alchemical symbols
                                u"\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
                                u"\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
                                u"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
                                u"\U0001FA00-\U0001FA6F"  # Chess Symbols
                                u"\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
                                u"\U00002702-\U000027B0"  # Dingbats
                                u"\U000024C2-\U0001F251" 
                                "]+", flags=re.UNICODE)
        clean_text = emoji_pattern.sub(r'', text)
        return clean_text
    
    def fix_dates(self, date_string):
        if 'Сегодня' in date_string:
            result = datetime.combine(datetime.now().date(), 
                                    #   (datetime.combine(datetime.today(), datetime.strptime(date_string.split(" ")[-1], '%H:%M').time()) - timedelta(hours=3)).time()).strftime("%Y-%m-%d %H:%M")
                                      (datetime.combine(datetime.today(), datetime.strptime(date_string.split(" ")[-1], '%H:%M').time())).time()).strftime("%Y-%m-%d %H:%M")

        elif 'Вчера' in date_string:
            result = datetime.combine(datetime.now().date() - timedelta(days=1), 
                                    #   (datetime.combine(datetime.today(), datetime.strptime(date_string.split(" ")[-1], '%H:%M').time()) - timedelta(hours=3)).time()).strftime("%Y-%m-%d %H:%M")
                                      (datetime.combine(datetime.today(), datetime.strptime(date_string.split(" ")[-1], '%H:%M').time())).time()).strftime("%Y-%m-%d %H:%M")

        else:
            result_list = date_string.split(" ")
            result_list[1] = self.month_dict[result_list[1]]
            result_list.pop(3)

            # result_list[-1]=str((int(result_list[-1].split(':')[0])+21)%24) + ":" + result_list[-1].split(':')[1]
            date_str = ' '.join(result_list)
            date_format = '%d %m %Y %H:%M'
            result = datetime.strptime(date_str, date_format).strftime("%Y-%m-%d %H:%M")

        return result
    
    def predict(self):
        question_context = get_question_context()
        client = OpenAI(api_key=DS_API_KEY, base_url="https://api.deepseek.com/v1")
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[
                {"role": "user", "content": question_context},
                {"role": "user", "content": self.text},
                {"role": "user", "content": 'Напиши краткий ответ в одну строчку. Только название одной компании и направлении цены, больше ничего, два слова через пробел без пояснений'}
                ]
            )
        answer = response.choices[0].message.content
        answer = answer.strip().upper()
        prediction = 0
        ticker = 0
        if "DOWN" in answer:
            prediction = -1
            answer = answer.replace("DOWN", "")
        elif "UP" in answer:
            prediction = 1
            answer = answer.replace("UP", "")
        else:
            return 0, 0
        if len(answer) < 20:
            ticker = re.sub(r'[+\-=:/]', '', answer)             
        self.ticker, self.prediction =  ticker, prediction

In [12]:
class Authors:
    def __init__(self, authors_list=[]):
        self.authors_list = authors_list

    def add_author(self, author):
        self.authors_list.append(author)

    def save_to_json(self, filename):
        author_data_list = []
        for author in self.authors_list:
            author_data = {
                "name": author.name,
                "base_url": author.base_url,
                "subscribers": int(author.subscribers),
                "size": author.size,
                "month_activity": int(author.month_activity),
                "profitability": author.profitability,
                "articles": [{"date": article.date, "text": article.text} for article in author.articles],
            }
            author_data_list.append(author_data)

        with open(filename, "w", encoding="utf-8") as json_file:
            json.dump(author_data_list, json_file, ensure_ascii=False, indent=4)

In [13]:
class Author:
    def __init__(self, name: str, subscribers, size, month_activity, profitability, articles):
        self.name = name
        self.base_url = self.get_base_url(name)
        self.subscribers = subscribers 
        self.size = size
        self.month_activity = month_activity
        self.profitability = profitability
        self.articles  = articles

    def get_subscribers():
        pass
    def get_profitability():
        pass
    def get_base_url(self, name, start= BASE_URL):
        return start+name

In [14]:
with open('data_posts.json', 'r', encoding='utf-8') as file:
    json_content = file.read()
    data = json.loads(json_content)

{'vadya93': {'base_url': 'https://www.tinkoff.ru/invest/social/profile/vadya93', 'subscribers': 94624, 'size': 'от 10000000', 'month_activity': 27510, 'profitability': 246.38, 'articles': [{'date': '2024-04-11 06:54', 'text': 'Рынок смотрится неплохо на фоне высказываний Набиуллиной\n\nДрузья, я продвигаюсь в решении своих вопросов, все уже сделал, денег потратил, так что сил появилось сильно больше и снова могу вам вместо завтрака тут что-то писать \n\nПоследние дни у нас росли преимущественно лишь ребята из первого эшелона, а все остальное стояло на месте, но вчера после Набиуллиной чутка стало повеселее и рынок рос уже широким фронтом.\n\nЧто я приметил:\n1. $ABRD  - подошла к свой древней отметке, которая была давным-давно. Я сам набрал небольшую (!) позицию с переносом, но поджим очень красивый. Там засело пару каналов, которые упорно не хотели закрывать в минус, это может тормозить рост, но в целом, они ждут выше, так что можем и порасти.\n\n2. $RUAL  не падал вчера даже на фоне 

In [33]:
for author in AUTHORS:
    print(author)
    new_articles = []
    updated_data = []
    # парсинг страницы
    url = get_base_url(author)
    headers = {'User-Agent': USER_AGENT}
    response = requests.get(url, headers=headers)
    response.encoding = 'utf-8'
    soup = BeautifulSoup(response.text, "html")
    posts = soup.find_all('div', {'data-qa-tag':'PulsePost'})

    for post in posts:
        text = post.find_all('div', {'class':'pulse-profilepage__fGGBmY'})[0].get_text()
        date = post.find_all('div', {'pulse-profilepage__cSULlZ'})[0].get_text()
        article = Article(date, text, from_json=False)

        # сравнение данных
        dates = [item['date'] for item in data[author]['articles']]


        if not (article.date in dates):
            print("Новый пост")
            article.prediction()
            d =  {
                    'date': article.date,
                    'text': article.text,
                    'ticker': article.ticker,
                    'prediction': article.prediction
                    }
            new_articles.append(d)
        else:
            print("Старый пост")
            break
    if len(new_articles) > 0:
        print("Есть новые посты")
        data[author]['articles'] = new_articles + data[author]['articles']
    else:
        print("Нет новых постов")

vadya93
2024-04-11 06:54
Старый пост
Poltrader
2024-04-10 15:27
Старый пост


In [6]:
df = pd.read_json("data_posts.json")
df = df.explode('articles')
df.reset_index(inplace=True, drop=True)

df['date'] = df['articles'].apply(lambda x: x.pop('date'))
df['text'] = df['articles'].apply(lambda x: x.pop('text'))
df.drop(columns=['articles'], inplace=True)

In [8]:
for author in AUTHORS:
    print(author)
    new_articles = []
    updated_data = []
    # парсинг страницы
    url = get_base_url(author)
    headers = {'User-Agent': USER_AGENT}
    response = requests.get(url, headers=headers)
    response.encoding = 'utf-8'
    soup = BeautifulSoup(response.text, "html")
    posts = soup.find_all('div', {'data-qa-tag':'PulsePost'})
    for post in posts:


        text = post.find_all('div', {'class':'pulse-profilepage__fGGBmY'})[0].get_text()
        date = post.find_all('div', {'pulse-profilepage__cSULlZ'})[0].get_text()
        article = Article(date, text)

        print(article.date)
        # сравнение данных
        if not (article.date in list(df.date)):
            print("Новый пост")
            new_articles.append(article)
        else:
            print("Старый пост")
            break


    filtered_data = df.loc[df['name']==author,['date', 'text']]
    articles = [Article(date, text, from_df=True) for date, text in zip(filtered_data['date'], filtered_data['text'])]
    articles = new_articles + articles


    name=author
    subscribers=df.loc[df['name']==author,].iloc[0,2]
    size=df.loc[df['name']==author,].iloc[0,3]
    month_activity=df.loc[df['name']==author,].iloc[0,4]
    profitability=df.loc[df['name']==author,].iloc[0,5]
    articles=articles
    
    updated_data.append(Author(name, subscribers, size, month_activity, profitability, articles))
    
        

artydevCo
2024-04-09 10:35
Старый пост


In [9]:
a = Authors(updated_data)

a.save_to_json("data_posts.json")