Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various sys_net improvements #12858

Merged
merged 2 commits into from Oct 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .clang-format
Expand Up @@ -21,7 +21,7 @@ Cpp11BracedListStyle: true
IndentCaseLabels: false
SortIncludes: false
ReflowComments: true
AlignConsecutiveAssignments: true
AlignConsecutiveAssignments: false
AlignTrailingComments: true
AlignAfterOpenBracket: DontAlign
ConstructorInitializerAllOnOneLineOrOnePerLine: false
Expand Down
4 changes: 2 additions & 2 deletions rpcs3/Emu/Cell/Modules/sceNp.cpp
Expand Up @@ -1272,7 +1272,7 @@ error_code sceNpBasicAddFriend(vm::cptr<SceNpId> contact, vm::cptr<char> body, s

error_code sceNpBasicGetFriendListEntryCount(vm::ptr<u32> count)
{
sceNp.warning("sceNpBasicGetFriendListEntryCount(count=*0x%x)", count);
sceNp.trace("sceNpBasicGetFriendListEntryCount(count=*0x%x)", count);

auto& nph = g_fxo->get<named_thread<np::np_handler>>();

Expand All @@ -1299,7 +1299,7 @@ error_code sceNpBasicGetFriendListEntryCount(vm::ptr<u32> count)

error_code sceNpBasicGetFriendListEntry(u32 index, vm::ptr<SceNpId> npid)
{
sceNp.warning("sceNpBasicGetFriendListEntry(index=%d, npid=*0x%x)", index, npid);
sceNp.trace("sceNpBasicGetFriendListEntry(index=%d, npid=*0x%x)", index, npid);

auto& nph = g_fxo->get<named_thread<np::np_handler>>();

Expand Down
12 changes: 9 additions & 3 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.h
Expand Up @@ -48,11 +48,17 @@ class lv2_socket
sys_net_linger linger;
};

struct sockopt_cache
{
sockopt_data data{};
s32 len = 0;
};

public:
SAVESTATE_INIT_POS(7); // Dependency on RPCN

lv2_socket(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol);
lv2_socket(utils::serial&){}
lv2_socket(utils::serial&) {}
lv2_socket(utils::serial&, lv2_socket_type type);
static std::shared_ptr<lv2_socket> load(utils::serial& ar);
void save(utils::serial&, bool save_only_this_class = false);
Expand Down Expand Up @@ -94,7 +100,7 @@ class lv2_socket

virtual std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> recvfrom(s32 flags, u32 len, bool is_lock = true) = 0;
virtual std::optional<s32> sendto(s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock = true) = 0;
virtual std::optional<s32> sendmsg(s32 flags, const sys_net_msghdr& msg, bool is_lock = true) = 0;
virtual std::optional<s32> sendmsg(s32 flags, const sys_net_msghdr& msg, bool is_lock = true) = 0;
RipleyTom marked this conversation as resolved.
Show resolved Hide resolved

virtual void close() = 0;
virtual s32 shutdown(s32 how) = 0;
Expand All @@ -114,7 +120,7 @@ class lv2_socket
lv2_socket(utils::serial&, bool);

shared_mutex mutex;
u32 lv2_id = 0;
s32 lv2_id = 0;

socket_type socket = 0;

Expand Down
3 changes: 3 additions & 0 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_native.cpp
Expand Up @@ -107,6 +107,9 @@ std::tuple<bool, s32, std::shared_ptr<lv2_socket>, sys_net_sockaddr> lv2_socket_
auto newsock = std::make_shared<lv2_socket_native>(family, type, protocol);
newsock->set_socket(native_socket, family, type, protocol);

// Sockets inherit non blocking behaviour from their parent
newsock->so_nbio = so_nbio;

sys_net_sockaddr ps3_addr = native_addr_to_sys_net_addr(native_addr);

return {true, 0, std::move(newsock), ps3_addr};
Expand Down
74 changes: 56 additions & 18 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp
Expand Up @@ -14,7 +14,7 @@ lv2_socket_p2p::lv2_socket_p2p(lv2_socket_family family, lv2_socket_type type, l
lv2_socket_p2p::lv2_socket_p2p(utils::serial& ar, lv2_socket_type type)
: lv2_socket(ar, type)
{
ar(port, vport);
ar(port, vport, bound_addr);

std::deque<std::pair<sys_net_sockaddr_in_p2p, std::vector<u8>>> data_dequeue{ar};

Expand All @@ -29,7 +29,7 @@ lv2_socket_p2p::lv2_socket_p2p(utils::serial& ar, lv2_socket_type type)
void lv2_socket_p2p::save(utils::serial& ar)
{
static_cast<lv2_socket*>(this)->save(ar, true);
ar(port, vport);
ar(port, vport, bound_addr);

std::deque<std::pair<sys_net_sockaddr_in_p2p, std::vector<u8>>> data_dequeue;

Expand Down Expand Up @@ -140,24 +140,31 @@ s32 lv2_socket_p2p::bind(const sys_net_sockaddr& addr)
p2p_vport++;
}
}
else
else if (pport.bound_p2p_vports.contains(p2p_vport))
{
if (pport.bound_p2p_vports.contains(p2p_vport))
// Check that all other sockets are SO_REUSEADDR or SO_REUSEPORT
auto& bound_sockets = ::at32(pport.bound_p2p_vports, p2p_vport);
if (!sys_net_helpers::all_reusable(bound_sockets))
{
return -SYS_NET_EADDRINUSE;
}
}

pport.bound_p2p_vports.insert(std::make_pair(p2p_vport, lv2_id));
bound_sockets.insert(lv2_id);
}
else
{
std::set<s32> bound_ports{lv2_id};
pport.bound_p2p_vports.insert(std::make_pair(p2p_vport, std::move(bound_ports)));
}
}
}

{
std::lock_guard lock(mutex);
port = p2p_port;
vport = p2p_vport;
socket = real_socket;
last_bound_addr = addr;
port = p2p_port;
vport = p2p_vport;
socket = real_socket;
bound_addr = psa_in_p2p->sin_addr;
}

return CELL_OK;
Expand Down Expand Up @@ -185,23 +192,41 @@ std::pair<s32, sys_net_sockaddr> lv2_socket_p2p::getsockname()
return {CELL_OK, sn_addr};
}

std::tuple<s32, lv2_socket::sockopt_data, u32> lv2_socket_p2p::getsockopt([[maybe_unused]] s32 level, [[maybe_unused]] s32 optname, [[maybe_unused]] u32 len)
std::tuple<s32, lv2_socket::sockopt_data, u32> lv2_socket_p2p::getsockopt(s32 level, s32 optname, u32 len)
{
// TODO
return {};
std::lock_guard lock(mutex);

const u64 key = (static_cast<u64>(level) << 32) | static_cast<u64>(optname);

if (!sockopts.contains(key))
{
sys_net.error("Unhandled getsockopt(level=%d, optname=%d, len=%d)", level, optname, len);
return {};
}

const auto& cache = ::at32(sockopts, key);
return {CELL_OK, cache.data, cache.len};
}

s32 lv2_socket_p2p::setsockopt(s32 level, s32 optname, const std::vector<u8>& optval)
{
// TODO
std::lock_guard lock(mutex);

int native_int = *reinterpret_cast<const be_t<s32>*>(optval.data());

if (level == SYS_NET_SOL_SOCKET && optname == SYS_NET_SO_NBIO)
{
so_nbio = native_int;
}

return {};
const u64 key = (static_cast<u64>(level) << 32) | static_cast<u64>(optname);
sockopt_cache cache{};
memcpy(&cache.data._int, optval.data(), optval.size());
cache.len = optval.size();

sockopts[key] = std::move(cache);

return CELL_OK;
}

std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> lv2_socket_p2p::recvfrom(s32 flags, u32 len, bool is_lock)
Expand Down Expand Up @@ -258,10 +283,12 @@ std::optional<s32> lv2_socket_p2p::sendto(s32 flags, const std::vector<u8>& buf,
inet_ntop(AF_INET, &native_addr.sin_addr, ip_str, sizeof(ip_str));
sys_net.trace("[P2P] Sending a packet to %s:%d:%d", ip_str, p2p_port, p2p_vport);

std::vector<u8> p2p_data(buf.size() + sizeof(u16));
std::vector<u8> p2p_data(buf.size() + VPORT_P2P_HEADER_SIZE);
const le_t<u16> p2p_vport_le = p2p_vport;
const le_t<u16> p2p_flags_le = P2P_FLAG_P2P;
memcpy(p2p_data.data(), &p2p_vport_le, sizeof(u16));
memcpy(p2p_data.data() + sizeof(u16), buf.data(), buf.size());
memcpy(p2p_data.data() + sizeof(u16), &p2p_flags_le, sizeof(u16));
memcpy(p2p_data.data() + VPORT_P2P_HEADER_SIZE, buf.data(), buf.size());

int native_flags = 0;
if (flags & SYS_NET_MSG_WAITALL)
Expand Down Expand Up @@ -307,7 +334,18 @@ void lv2_socket_p2p::close()
auto& p2p_port = ::at32(nc.list_p2p_ports, port);
{
std::lock_guard lock(p2p_port.bound_p2p_vports_mutex);
p2p_port.bound_p2p_vports.erase(vport);
if (!p2p_port.bound_p2p_vports.contains(vport))
{
return;
}

auto& bound_sockets = ::at32(p2p_port.bound_p2p_vports, vport);
bound_sockets.erase(lv2_id);

if (bound_sockets.empty())
{
p2p_port.bound_p2p_vports.erase(vport);
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.h
Expand Up @@ -41,4 +41,6 @@ class lv2_socket_p2p : public lv2_socket
u32 bound_addr = 0;
// Queue containing received packets from network_thread for SYS_NET_SOCK_DGRAM_P2P sockets
std::queue<std::pair<sys_net_sockaddr_in_p2p, std::vector<u8>>> data{};
// List of sock options
std::map<u64, sockopt_cache> sockopts;
};
87 changes: 66 additions & 21 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp
Expand Up @@ -184,7 +184,7 @@ void initialize_tcp_timeout_monitor()
g_fxo->need<named_thread<tcp_timeout_monitor>>();
}

u16 u2s_tcp_checksum(const u16* buffer, usz size)
u16 u2s_tcp_checksum(const le_t<u16>* buffer, usz size)
{
u32 cksum = 0;
while (size > 1)
Expand All @@ -202,20 +202,22 @@ u16 u2s_tcp_checksum(const u16* buffer, usz size)

std::vector<u8> generate_u2s_packet(const p2ps_encapsulated_tcp& header, const u8* data, const u32 datasize)
{
const u32 packet_size = (sizeof(u16) + sizeof(p2ps_encapsulated_tcp) + datasize);
const u32 packet_size = (VPORT_P2P_HEADER_SIZE + sizeof(p2ps_encapsulated_tcp) + datasize);
ensure(packet_size < 65535); // packet size shouldn't be bigger than possible UDP payload
std::vector<u8> packet(packet_size);
u8* packet_data = packet.data();
le_t<u16> dst_port_le = +header.dst_port;
u8* packet_data = packet.data();
le_t<u16> dst_port_le = +header.dst_port;
le_t<u16> p2p_flags_le = P2P_FLAG_P2PS;

memcpy(packet_data, &dst_port_le, sizeof(u16));
memcpy(packet_data + sizeof(u16), &header, sizeof(p2ps_encapsulated_tcp));
memcpy(packet_data + sizeof(u16), &p2p_flags_le, sizeof(u16));
memcpy(packet_data + VPORT_P2P_HEADER_SIZE, &header, sizeof(p2ps_encapsulated_tcp));
if (datasize)
memcpy(packet_data + sizeof(u16) + sizeof(p2ps_encapsulated_tcp), data, datasize);
memcpy(packet_data + VPORT_P2P_HEADER_SIZE + sizeof(p2ps_encapsulated_tcp), data, datasize);

auto* hdr_ptr = reinterpret_cast<p2ps_encapsulated_tcp*>(packet_data + sizeof(u16));
auto* hdr_ptr = reinterpret_cast<p2ps_encapsulated_tcp*>(packet_data + VPORT_P2P_HEADER_SIZE);
hdr_ptr->checksum = 0;
hdr_ptr->checksum = u2s_tcp_checksum(utils::bless<u16>(hdr_ptr), sizeof(p2ps_encapsulated_tcp) + datasize);
hdr_ptr->checksum = u2s_tcp_checksum(utils::bless<le_t<u16>>(hdr_ptr), sizeof(p2ps_encapsulated_tcp) + datasize);

return packet;
}
Expand All @@ -225,7 +227,7 @@ lv2_socket_p2ps::lv2_socket_p2ps(lv2_socket_family family, lv2_socket_type type,
{
}

lv2_socket_p2ps::lv2_socket_p2ps(socket_type socket, u16 port, u16 vport, u32 op_addr, u16 op_port, u16 op_vport, u64 cur_seq, u64 data_beg_seq)
lv2_socket_p2ps::lv2_socket_p2ps(socket_type socket, u16 port, u16 vport, u32 op_addr, u16 op_port, u16 op_vport, u64 cur_seq, u64 data_beg_seq, s32 so_nbio)
: lv2_socket_p2p(SYS_NET_AF_INET, SYS_NET_SOCK_STREAM_P2P, SYS_NET_IPPROTO_IP)
{
this->socket = socket;
Expand All @@ -236,6 +238,7 @@ lv2_socket_p2ps::lv2_socket_p2ps(socket_type socket, u16 port, u16 vport, u32 op
this->op_vport = op_vport;
this->cur_seq = cur_seq;
this->data_beg_seq = data_beg_seq;
this->so_nbio = so_nbio;
status = p2ps_stream_status::stream_connected;
}

Expand Down Expand Up @@ -410,7 +413,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb
const u16 new_op_vport = tcp_header->src_port;
const u64 new_cur_seq = send_hdr.seq + 1;
const u64 new_data_beg_seq = send_hdr.ack;
auto sock_lv2 = std::make_shared<lv2_socket_p2ps>(socket, port, vport, new_op_addr, new_op_port, new_op_vport, new_cur_seq, new_data_beg_seq);
auto sock_lv2 = std::make_shared<lv2_socket_p2ps>(socket, port, vport, new_op_addr, new_op_port, new_op_vport, new_cur_seq, new_data_beg_seq, so_nbio);
const s32 new_sock_id = idm::import_existing<lv2_socket>(sock_lv2);
sock_lv2->set_lv2_id(new_sock_id);
const u64 key_connected = (reinterpret_cast<struct sockaddr_in*>(op_addr)->sin_addr.s_addr) | (static_cast<u64>(tcp_header->src_port) << 48) | (static_cast<u64>(tcp_header->dst_port) << 32);
Expand Down Expand Up @@ -494,6 +497,27 @@ void lv2_socket_p2ps::set_status(p2ps_stream_status new_status)
status = new_status;
}

std::pair<s32, sys_net_sockaddr> lv2_socket_p2ps::getpeername()
{
std::lock_guard lock(mutex);

if (!op_addr || !op_port || !op_vport)
{
return {-SYS_NET_ENOTCONN, {}};
}

sys_net_sockaddr res{};
sys_net_sockaddr_in_p2p* p2p_addr = reinterpret_cast<sys_net_sockaddr_in_p2p*>(&res);

p2p_addr->sin_len = sizeof(sys_net_sockaddr_in_p2p);
p2p_addr->sin_family = SYS_NET_AF_INET;
p2p_addr->sin_addr = std::bit_cast<be_t<u32>, u32>(op_addr);
p2p_addr->sin_port = op_vport;
p2p_addr->sin_vport = op_port;

return {CELL_OK, res};
}

std::tuple<bool, s32, std::shared_ptr<lv2_socket>, sys_net_sockaddr> lv2_socket_p2ps::accept(bool is_lock)
{
std::unique_lock<shared_mutex> lock(mutex, std::defer_lock);
Expand Down Expand Up @@ -572,26 +596,36 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr)
if (p2p_vport == 0)
{
p2p_vport = 30000;
while (pport.bound_p2p_streams.contains((static_cast<u64>(p2p_vport) << 32)))
while (pport.bound_p2ps_vports.contains(p2p_vport))
{
p2p_vport++;
}
pport.bound_p2p_streams.emplace((static_cast<u64>(p2p_vport) << 32), lv2_id);
std::set<s32> bound_ports{lv2_id};
pport.bound_p2ps_vports.insert(std::make_pair(p2p_vport, std::move(bound_ports)));
}
else
{
const u64 key = (static_cast<u64>(p2p_vport) << 32);
if (pport.bound_p2p_streams.contains(key))
if (pport.bound_p2ps_vports.contains(p2p_vport))
{
return -SYS_NET_EADDRINUSE;
auto& bound_sockets = ::at32(pport.bound_p2ps_vports, p2p_vport);
if (!sys_net_helpers::all_reusable(bound_sockets))
{
return -SYS_NET_EADDRINUSE;
}

bound_sockets.insert(lv2_id);
}
else
{
std::set<s32> bound_ports{lv2_id};
pport.bound_p2ps_vports.insert(std::make_pair(p2p_vport, std::move(bound_ports)));
}
pport.bound_p2p_streams.emplace(key, lv2_id);
}

port = p2p_port;
vport = p2p_vport;
socket = real_socket;
last_bound_addr = addr;
port = p2p_port;
vport = p2p_vport;
socket = real_socket;
bound_addr = psa_in_p2p->sin_addr;
}
}

Expand Down Expand Up @@ -817,13 +851,24 @@ void lv2_socket_p2ps::close()
std::lock_guard lock(p2p_port.bound_p2p_vports_mutex);
for (auto it = p2p_port.bound_p2p_streams.begin(); it != p2p_port.bound_p2p_streams.end();)
{
if (static_cast<u32>(it->second) == lv2_id)
if (it->second == lv2_id)
{
it = p2p_port.bound_p2p_streams.erase(it);
continue;
}
it++;
}

if (p2p_port.bound_p2ps_vports.contains(vport))
{
auto& bound_ports = ::at32(p2p_port.bound_p2ps_vports, vport);
bound_ports.erase(lv2_id);

if (bound_ports.empty())
{
p2p_port.bound_p2ps_vports.erase(vport);
}
RipleyTom marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand Down