# Ceneo Scraper

## Struktura opinii w serwisie ceneo.pl

|składowa|zmienna|selektor|
|--------|-------|--------|
|identyfikator opinii|opinion_id|["data-entry-id"]|
|autora|author|.user-post__author-name|
|rekomendacje|recommandation|.user-post__author-recomendation > em|
|liczbę gwiazdek|rating|.user-post__score-count|
|treść opinii|content|.user-post__text|
|listę zalet|pros|review-feature__title--positives ~ review-feature__item|
|listę wad|cons|review-feature__title--negatives ~ review-feature__item|
|dawa wystawienia|opinion_date|.user-post__published > time:nth-child(1)["datetime"]|
|data zakupu produktu|purchase_date|.user-post__published > time:nth-child(2)["datetime"]|
|ile osób uznało opinię za przydatną|likes|button.vote-yes > span|
|ile osób uznało opinię za nieprzydatną|dislikes|button.vote-no > span|

In [None]:
selectors = {
    'author': (".user-post__author-name",),
    'recommandation': (".user-post__author-recomendation > em",),
    'rating': (".user-post__score-count",),
    'content': (".user-post__text",),
    'pros': ("div.review-feature__title--positives ~ review-feature__item", None, True),
    'cons': ("div.review-feature__title--negatives ~ review-feature__item", None, True),
    'opinion_date': (".user-post__published > time:nth-child(1)", 'datetime'),
    'purchase_date': (".user-post__published > time:nth-child(2)", 'datetime'),
    'likes': ("button.vote-yes > span",),
    'dislikes': ("button.vote-no > span",),
    'opinion_id': (None, "data-entry-id"),
}

1. Import bibliotek

In [None]:
import os
import json
import requests
from bs4 import BeautifulSoup
from flask import Flask, render_template, request, session, send_file, jsonify, redirect, url_for
import pandas as pd
from openpyxl.workbook import Workbook
from xlsxwriter import Workbook
import io
import matplotlib.pyplot as plt
import base64

2. Funkcja do ekstrakcji zawartości ze strony HTML

In [None]:
def extract(ancestor, selector, attribute = None, return_list = False):
    if return_list:
        if attribute:
            return [tag[attribute].strip() for tag in ancestor.select(selector)]
        return [tag.get_text().strip() for tag in ancestor.select(selector)]
    if selector:
        if attribute:
            try:
                return ancestor.select_one(selector)[attribute].strip()
            except TypeError:
                return None
        try:
            return ancestor.select_one(selector).get_text().strip()
        except AttributeError:
            return None
    if attribute:
        return ancestor[attribute].strip()
    return ancestor.get_text().strip()

3. Funkcja do wczytania pliku z pobranymi opiniami

In [None]:
def load_opinions(product_id):
    with open(f'opinions/{product_id}.json', 'r', encoding='UTF-8') as f:
        return json.load(f)

4. Funkcja do analizy opinii (dane potrzebne do strony z listą produktów)

In [None]:
def analyze_opinions(opinions):
    num_opinions = len(opinions)
    num_pros = sum(len(opinion['pros']) for opinion in opinions if opinion['pros'])
    num_cons = sum(len(opinion['cons']) for opinion in opinions if opinion['cons'])
    try:
        ratings = [float(opinion['rating'].split('/')[0].replace(',', '.')) for opinion in opinions if opinion['rating']]
        avg_rating = sum(ratings) / len(ratings) if ratings else 0
    except Exception as e:
        print(f"Error processing ratings: {e}")
        avg_rating = 0
    return num_opinions, num_pros, num_cons, avg_rating

5. Stworzenie linków do pobrania plików w formatach .csv, .xlsx, .json

In [None]:
def create_download_link(product_id, file_format):
    opinions = load_opinions(product_id)
    if file_format == 'csv':
        df = pd.DataFrame(opinions)
        output = io.BytesIO()
        df.to_csv(output, index=False, encoding='utf-8-sig')
        output.seek(0)
        return output, 'text/csv', f'{product_id}.csv'
    elif file_format == 'xlsx':
        df = pd.DataFrame(opinions)
        output = io.BytesIO()
        with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
            df.to_excel(writer, index=False, sheet_name='Opinions')
        output.seek(0)
        return output, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', f'{product_id}.xlsx'
    elif file_format == 'json':
        output = io.BytesIO()
        output.write(json.dumps(opinions, indent=4, ensure_ascii=False).encode('UTF-8'))
        output.seek(0)
        return output, 'application/json', f'{product_id}.json'

6. Funkcja do tworzenia wykresu z rekomendacjami

In [None]:
def generate_recommendation_chart(opinions):
    recommendation_data = {}
    for opinion in opinions:
        recommendation = opinion['recommandation']
        if recommendation:
            recommendation_data[recommendation] = recommendation_data.get(recommendation, 0) + 1
    
    fig, ax = plt.subplots()
    ax.pie(recommendation_data.values(), labels=recommendation_data.keys(), autopct='%1.1f%%')
    ax.set_title('Udział poszczególnych rekomendacji')

    output = io.BytesIO()
    plt.savefig(output, format='png')
    plt.close(fig)
    output.seek(0)
    img = base64.b64encode(output.getvalue()).decode('utf-8')
    return img

7. Funkcja do tworzenia wykresu z ocenami

In [None]:
def generate_rating_chart(opinions):
    rating_data = {}
    for opinion in opinions:
        rating = opinion['rating']
        if rating:
            stars = rating.split('/')[0]
            rating_data[stars] = rating_data.get(stars, 0) + 1

    fig, ax = plt.subplots()
    ax.bar(rating_data.keys(), rating_data.values())
    ax.set_title('Liczba opinii z poszczególnymi liczbami gwiazdek')

    output = io.BytesIO()
    plt.savefig(output, format='png')
    plt.close(fig)
    output.seek(0)
    img = base64.b64encode(output.getvalue()).decode('utf-8')
    return img

8. Flask i routing

In [None]:
app = Flask(__name__)
app.config['SECRET_KEY'] = 'loki'

@app.route('/')
@app.route('/home')
def home_page():
    return render_template('home.html')

# strona do ekstrakcji opinii
@app.route('/opinions', methods=['GET', 'POST'])
def opinions_page():
    data = []
    product_id = None
    error_message = None

    # pobranie ID z formularza
    if request.method == 'POST':
        product_id = request.form['product_id']
        if not product_id:
            error_message = 'Nie wpisano kodu produktu'
        else:
            url = f'https://www.ceneo.pl/{product_id}#tab=reviews'
            all_opinions = []

            response = requests.get(url)
            if response.status_code != 200:
                error_message = f'Nie można pobrać opinii o produkcie "{product_id}" lub produkt nie posiada żadnych opinii. Sprawdź, czy produkt o ID "{product_id}" istnieje.'

        # ekstrkacja wszystkich opinii
        while url:
            print(url)
            response = requests.get(url)
            page = BeautifulSoup(response.text, 'html.parser')
            opinions = page.select('div.js_product-review')
            for opinion in opinions:
                single_opinion = {
                    key: extract(opinion, *value)
                    for key, value in selectors.items()
                }
                all_opinions.append(single_opinion)
            try:
                next_page_url = extract(page, 'a.pagination__next', 'href')
                if next_page_url:
                    url = 'https://www.ceneo.pl' + next_page_url
                else:
                    url = None
            except TypeError:
                url = None
        
        # tworzenie pliku JSON z opiniami o produkcie
        if not error_message:
            if not os.path.exists('opinions'):
                os.mkdir('opinions')
            with open(f'opinions/{product_id}.json', 'w', encoding='UTF-8') as jf:
                json.dump(all_opinions, jf, indent=4, ensure_ascii=False)

        # zmienna product_id dla sesji
        session['product_id'] = product_id
    
    return render_template('opinions_extract.html', product_id=product_id, data=data, error_message=error_message)

@app.route('/products', methods=['GET', 'POST'])
def products_page():
    path = 'opinions'
    product_files = os.listdir(path)
    products_data = []

    for product_file in product_files:
        product_id = product_file.split('.')[0]
        opinions = load_opinions(product_id)
        num_opinions, num_pros, num_cons, avg_rating = analyze_opinions(opinions)

        products_data.append({
            'product_id': product_id,
            'num_opinions': num_opinions,
            'num_pros': num_pros,
            'num_cons': num_cons,
            'avg_rating': avg_rating,
            'csv_link': f'/download/{product_id}/csv',
            'xlsx_link': f'/download/{product_id}/xlsx',
            'json_link': f'/download/{product_id}/json',
        })
    
    return render_template('products_list.html', products_data=products_data)

@app.route('/product/<product_id>', methods=['GET', 'POST'])
def product_page(product_id):
    opinions = load_opinions(product_id)
    df = pd.DataFrame(opinions)

    if request.method == 'POST':
        column = request.form.get('column')
        filter_value = request.form.get('filter')
        if column and filter_value:
            df = df[df[column].str.contains(filter_value, na=False)]

    sorted_column = request.args.get('sort')
    if sorted_column:
        df = df.sort_values(by=[sorted_column])

    opinions = df.to_dict('records')
    return render_template('product.html', product_id=product_id, opinions=opinions, columns=df.columns)

@app.route('/product/<product_id>/charts')
def product_charts_page(product_id):
    opinions = load_opinions(product_id)
    recommendation_chart = generate_recommendation_chart(opinions)
    rating_chart = generate_rating_chart(opinions)
    return render_template('product_charts.html', product_id=product_id, recommendation_chart=recommendation_chart, rating_chart=rating_chart)

@app.route('/download/<product_id>/<file_format>')
def download_file(product_id, file_format):
    file_obj, mime_type, file_name = create_download_link(product_id, file_format)
    return send_file(file_obj, mimetype=mime_type, as_attachment=True, download_name=file_name)

@app.route('/author')
def author_page():
    return render_template('author.html')

if __name__ == '__main__':
    app.run(debug=True, use_reloader=False)

In [None]:
#product_id = '84514582'
# product_id = input('Podaj kod produktu z Ceneo.pl: ')
# url = f'https://www.ceneo.pl/{product_id}#tab=reviews'
# print(url)

4. Pobranie opinii o produkcie

In [None]:
# all_opinions = []
# while(url): 
#     response = requests.get(url)
#     page = BeautifulSoup(response.text, 'html.parser')
#     opinions = page.select('div.js_product-review')
#     for opinion in opinions:
#         single_opinion = {
#             key: extract(opinion, *value)
#                 for key, value in selectors.items()
#         }
#         all_opinions.append(single_opinion)
#     try:
#         url = 'https://www.ceneo.pl' + extract(page, 'a.pagination__next', 'href')
#     except TypeError:
#         url = None

5. Stworzenie folderu 'opinions' oraz pliku JSON zawierającego opinie

In [None]:
# if not os.path.exists('opinions'):
#     os.mkdir('opinions')
# with open(f'opinions/{product_id}.json', 'w', encoding='UTF-8') as jf:
#     json.dump(all_opinions, jf, indent=4, ensure_ascii=False)