In [None]:
from google.colab import drive
drive.mount('/content/drive')
import numpy as np
import pandas as pd

Mounted at /content/drive


### 爬蟲

爬蟲套件

In [None]:
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import time
import progressbar

爬蟲main function

In [None]:
class PTTScraper:
    base_url = "https://www.ptt.cc"

    def __init__(self, _board):
        self.base_url = PTTScraper.base_url
        self.url = self.base_url + f"/bbs/{_board}/index.html" #讓 URL變成特定的板

    def get_post_content(self, post_url): #self = 特定版的網址; post = 文章的網址
        soup = PTTScraper.get_soup(self.base_url + post_url) #透過beautiful soup 把該文章轉換為soup物件
        content = soup.find(id='main-content').text

        # 抓取推文
        pushes = soup.find_all('div', class_='push')

        with ThreadPoolExecutor() as executor:
            push_list = list(executor.map(self.get_push, pushes)) #用get_push 把push（userid, date等等）的一些特徵也處理完

        return content, push_list

    def get_push(self, push):
        try:
            if push.find('span', class_='push-tag') is None:
                return dict()
            push_tag = push.find('span', class_='push-tag').text.strip()
            push_userid = push.find('span', class_='push-userid').text.strip()
            push_content = push.find('span', class_='push-content').text.strip().lstrip(":")
            push_ipdatetime = push.find('span', class_='push-ipdatetime').text.strip()
            push_dict = {
                "Tag": push_tag,
                "Userid": push_userid,
                "Content": push_content,
                "Ipdatetime": push_ipdatetime
            }
        except Exception as e:
            print(e)
        return push_dict

    @staticmethod
    def get_soup(url): # url = 文章網址
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/58.0.3029.110 Safari/537.3", }
        cookies = {"over18": "1"}
        response = requests.get(url, headers=headers, cookies=cookies)
        return BeautifulSoup(response.text, 'html.parser') #返回一個 BeautifulSoup 物件，該物件包含網頁的解析結構，您可以使用 BeautifulSoup 提供的方法和屬性來尋找、提取和處理網頁中的數據。

    def fetch_post(self, url):
        soup = PTTScraper.get_soup(self.base_url + url)

        # Extract post information
        try:
            if soup.find(id='main-content') is not None:
                content = soup.find(id='main-content').text
                content = content.split('※ 發信站')[0]
            if soup.find(class_='article-meta-value') is not None:
                author = soup.find(class_='article-meta-value').text
                title = soup.find_all(class_='article-meta-value')[-2].text
                date_str = soup.find_all(class_='article-meta-value')[-1].text
                date = datetime.strptime(date_str, '%a %b %d %H:%M:%S %Y')
            else:
                author = None
                title = None
                date_str = None
                date = None
        except Exception as e:
            print(self.base_url + url)
            print(e)
        # Extract comments
        pushes = soup.find_all('div', class_='push')

        with ThreadPoolExecutor() as executor:
            push_list = list(executor.map(self.get_push, pushes))
        return {'Title': title, 'Author': author, 'Date': date, 'Content': content,
                'Link': url, 'Pushes': push_list}

    def get_data_current_page(self, soup=None, until_date=datetime.now(), *args,
                              max_posts=1000, links_num=0):
        reach = False
        until_date = until_date.replace(hour=0, minute=0, second=0, microsecond=0)

        if soup is None:
            soup = PTTScraper.get_soup(self.url)
        links = []
        div_element = soup.find('div', {'class': 'r-list-sep'})
        if div_element is None:
            for entry in reversed(soup.select('.r-ent')):
                try:
                    title = entry.find("div", "title").text.strip()
                    if entry.find("div", "title").a is None:
                        continue
                    # print(title)
                    if len(args) == 2:
                        if not (args[0] in title and args[1] in title):
                            continue
                    elif len(args) == 1:
                        if args[0] not in title:
                            print("1")
                            # continue
                    else:
                        pass
                    date = entry.select('.date')[0].text.strip()

                    post_date = datetime.strptime(date, '%m/%d').replace(year=until_date.year)
                    # print(len(links))
                    if len(links) + links_num >= max_posts or post_date < until_date:
                        reach = True
                        break
                    links.append(entry.select('.title a')[0]['href'])
                except Exception as e:
                    print(e)
        else:
            previous_elements = [element for element in div_element.previous_siblings if
                                 element.name == 'div' and 'r-ent' in element.get('class', [])]
            for element in reversed(previous_elements):

                # 找到標題和連結的元素
                title_link_element = element.find('a')
                if title_link_element:
                    # 取得標題和連結
                    title = title_link_element.text.strip()
                    if len(args) == 2:
                        if not (args[0] in title and args[1] in title):
                            continue
                    links.append(title_link_element.get('href'))
                date_element = element.find('div', {'class': 'date'})
                if date_element:
                    # 取得發文日期
                    date = date_element.text.strip()
                post_date = datetime.strptime(date, '%m/%d').replace(year=until_date.year)
                if len(links) + links_num >= max_posts or post_date < until_date:
                    reach = True
                    break
        if 'post_date' not in locals():
            return [], False, 0
        print(post_date)
        # print(len(links))
        with ThreadPoolExecutor() as executor:
            data = list(executor.map(self.fetch_post, links))
        return data, reach, len(links)

    def get_data_until(self, until_date, *args, max_posts=1000):
        """
        取得到 until_date 之後的所有文章
        :param until_date:  日期
        :param max_posts: 最多抓取文章
        :return: 文章串列
        """
        data = []
        if not isinstance(until_date, datetime):
            date = datetime.strptime(until_date, '%m/%d').replace(year=datetime.now().year)
        else:
            date = until_date
        links_num = 0
        while True:
            soup = PTTScraper.get_soup(self.url)
            data_curr, date_end, num = self.get_data_current_page(soup, date, *args,
                                                                  max_posts=max_posts, links_num=links_num)
            data.extend(data_curr)
            #print("文章總篇數:", len(data))
            if date_end:
                return data
            links_num += num

            # 找到上一頁的連結
            prev_link = soup.find('a', string='‹ 上頁')['href']
            self.url = self.base_url + prev_link
        return data

    def get_data_days_before(self, delta_days, *args, max_posts=1000):
        """
        取得 delat_days 天之前的文章
        :param delta_days: 間隔天數
        :param max_posts: 最多回抓取幾篇PO文
        :return: 文章 list
        """
        after_date = datetime.now() - timedelta(days=delta_days)
        # print(args)
        return self.get_data_until(after_date, *args, max_posts=max_posts)

    def get_title_and_before_days(self, *args, delta_days, max_posts=1000):
        return self.get_data_days_before(delta_days, *args, max_posts=max_posts)

指令function

In [None]:
# 使用方式
if __name__ == "__main__":
    board = "Stock"
    scraper = PTTScraper(board)
    begin = time.time() #看執行時間用的，不重要
    #data = scraper.get_data_days_before(1)
    dataA = scraper.get_title_and_before_days("盤後", "[閒聊]", delta_days=350, max_posts=1000)
    end = time.time()
    print(end - begin)
    if dataA is not None:
        dfA = pd.DataFrame(dataA)
        print(dfA)
    # print(pd.DataFrame(df.Pushes[1]))

# 這裡 push 跟 content 事實上沒有分乾淨，但直接看Push貌似是沒問題的
if __name__ == "__main__":
    board = "Stock"
    scraper = PTTScraper(board)
    begin = time.time() #看執行時間用的，不重要
    #data = scraper.get_data_days_before(1)
    dataD = scraper.get_title_and_before_days("盤中", "[閒聊]", delta_days=350, max_posts=1000)
    end = time.time()
    print(end - begin)
    if dataD is not None:
        dfD = pd.DataFrame(dataD)
        print(dfD)

過濾function

In [None]:
words_to_removeD = ['盤後閒聊']
words_to_removeA = ['盤中閒聊']
# Use boolean indexing to delete rows containing specified words
dfD = dfD[~dfD['Title'].str.contains('|'.join(words_to_removeD))]
dfA = dfA[~dfA['Title'].str.contains('|'.join(words_to_removeA))]
# Reset the index to reorganize it
dfD = dfD.reset_index(drop=True)
dfA = dfA.reset_index(drop=True)
# Display the modified DataFrame
#print("\nDataFrame after deleting rows containing specified words:")
print("這是盤後閒聊")
print(dfA.head())
print("這是盤中閒聊")
print(dfD.head())


### 斷詞套件

外部下載

In [None]:
!pip install -U ckiptagger[tf,gdown]
from ckiptagger import data_utils, construct_dictionary, WS, POS, NER

In [None]:
pip install opencc-python-reimplemented

內部colab連接

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
ws = WS("/content/drive/MyDrive/專題code/data", disable_cuda=False)
pos = POS("/content/drive/MyDrive/專題code/data", disable_cuda=False)
ner = NER("/content/drive/MyDrive/專題code/data", disable_cuda=False)

Mounted at /content/drive


NameError: ignored

引入新字典及正負面分隔

In [None]:
#引入字典
opinion = pd.read_excel('/content/drive/MyDrive/專題code/程式碼的家/opinion.xlsx')
opinion_positive = opinion[opinion['情緒分數'] > 0]
opinion_negative = opinion[opinion['情緒分數'] < 0]
opinion_negative

### 資料整理

日期篩取

In [None]:
after_date_list = []
after_date_list = dfA['Date']
after_date_list_clear = [timestamp.date() for timestamp in after_date_list]
after_date_list_clear = [date.strftime('%Y-%m-%d') for date in after_date_list_clear]

middle_date_list = []
middle_date_list = dfD['Date']
middle_date_list_clear = [timestamp.date() for timestamp in middle_date_list]
middle_date_list_clear = [date.strftime('%Y-%m-%d') for date in middle_date_list_clear]
middle_date_list_clear

建立分數表
row為日期
column為特徵

In [None]:
if len(after_date_list_clear) > len(middle_date_list_clear):
  sentiment_df = pd.DataFrame(columns=['y_m_+','y_m_-','y_m_t','y_a_+','y_a_-','y_a_t'],index=after_date_list_clear)
else:
  sentiment_df = pd.DataFrame(columns=['y_m_+','y_m_-','y_m_t','y_a_+','y_a_-','y_a_t'],index=middle_date_list_clear)

### 分數計算

In [None]:
from collections import Counter
from itertools import chain

In [None]:
def push_sentiment_calculator(Pushes,date,date_tommow):
  df2 = list(Pushes)
  sentiment_grade = 0
  Positive_grades = 0
  Negative_grades = 0
  contents_only = []
  contents_only = list(map(lambda d: d.get("Content", "N/A"), df2))
  contents_only

  word_sentence_list1 = ws(
      contents_only,
      sentence_segmentation = True, # To consider delimiters
      segment_delimiter_set = {",", "。", ":", "?", "!", ";"}, # This is the defualt set of delimiters
      # recommend_dictionary = dictionary1, # words in this dictionary are encouraged
      # coerce_dictionary = dictionary2, # words in this dictionary are forced
  )
  one_dimensional_wordlist = list(chain(*word_sentence_list1))

  # 两个示例列表
  list_A = one_dimensional_wordlist

  # 使用 Counter 对列表 A 进行计数
  counter_A = Counter(list_A)

  for index, row in opinion_positive.iterrows():
    # 如果字词在 list_A 中
    if row['情緒字詞'] in list_A:
        # 将对应分数加入 sentiment_grade
        Positive_grades += row['情緒分數']

  for index, row in opinion_negative.iterrows():
    if row['情緒字詞'] in list_A:
        # 将对应分数加入 sentiment_grade
        Negative_grades += row['情緒分數']

  sentiment_grade = Positive_grades + Negative_grades

  sentiment_df.loc[date_tommow, 'y_m_+'] = Positive_grades
  sentiment_df.loc[date_tommow, 'y_m_-'] = Negative_grades
  sentiment_df.loc[date_tommow, 'y_m_t'] = sentiment_grade
  sentiment_df.loc[date_tommow, 'y_a_+'] = Positive_grades
  sentiment_df.loc[date_tommow, 'y_a_-'] = Negative_grades
  sentiment_df.loc[date_tommow, 'y_a_t'] = sentiment_grade

  print(sentiment_grade)
  #print(Negative_counts)
  return sentiment_grade

執行情緒特徵表填入

In [None]:
for i in range(1,len(after_date_list_clear),1):
  push_sentiment_calculator(dfA['Pushes'][i],after_date_list_clear[i],after_date_list_clear[i-1])
for i in range(1,len(middle_date_list_clear),1):
  push_sentiment_calculator(dfD['Pushes'][i],middle_date_list_clear[i],middle_date_list_clear[i-1])

In [None]:
sentiment_df